""" The Frequent Pattern Growth algorithm (FP-Growth) is a widely used data mining technique for discovering frequent itemsets in large transaction databases. It overcomes some of the limitations of traditional methods such as Apriori by efficiently constructing the FP-Tree WIKI: https://athena.ecs.csus.edu/~mei/associationcw/FpGrowth.html Examples: https://www.javatpoint.com/fp-growth-algorithm-in-data-mining """ from __future__ import annotations from dataclasses import dataclass, field @dataclass class TreeNode: """ A node in a Frequent Pattern tree. Args: name: The name of this node. num_occur: The number of occurrences of the node. parent_node: The parent node. Example: >>> parent = TreeNode("Parent", 1, None) >>> child = TreeNode("Child", 2, parent) >>> child.name 'Child' >>> child.count 2 """ name: str count: int parent: TreeNode | None = None children: dict[str, TreeNode] = field(default_factory=dict) node_link: TreeNode | None = None def __repr__(self) -> str: return f"TreeNode({self.name!r}, {self.count!r}, {self.parent!r})" def inc(self, num_occur: int) -> None: self.count += num_occur def disp(self, ind: int = 1) -> None: print(f"{' ' * ind} {self.name} {self.count}") for child in self.children.values(): child.disp(ind + 1) def create_tree(data_set: list, min_sup: int = 1) -> tuple[TreeNode, dict]: """ Create Frequent Pattern tree Args: data_set: A list of transactions, where each transaction is a list of items. min_sup: The minimum support threshold. Items with support less than this will be pruned. Default is 1. Returns: The root of the FP-Tree. header_table: The header table dictionary with item information. Example: >>> data_set = [ ... ['A', 'B', 'C'], ... ['A', 'C'], ... ['A', 'B', 'E'], ... ['A', 'B', 'C', 'E'], ... ['B', 'E'] ... ] >>> min_sup = 2 >>> fp_tree, header_table = create_tree(data_set, min_sup) >>> fp_tree TreeNode('Null Set', 1, None) >>> len(header_table) 4 >>> header_table["A"] [[4, None], TreeNode('A', 4, TreeNode('Null Set', 1, None))] >>> header_table["E"][1] # doctest: +NORMALIZE_WHITESPACE TreeNode('E', 1, TreeNode('B', 3, TreeNode('A', 4, TreeNode('Null Set', 1, None)))) >>> sorted(header_table) ['A', 'B', 'C', 'E'] >>> fp_tree.name 'Null Set' >>> sorted(fp_tree.children) ['A', 'B'] >>> fp_tree.children['A'].name 'A' >>> sorted(fp_tree.children['A'].children) ['B', 'C'] """ header_table: dict = {} for trans in data_set: for item in trans: header_table[item] = header_table.get(item, [0, None]) header_table[item][0] += 1 for k in list(header_table): if header_table[k][0] < min_sup: del header_table[k] if not (freq_item_set := set(header_table)): return TreeNode("Null Set", 1, None), {} for key, value in header_table.items(): header_table[key] = [value, None] fp_tree = TreeNode("Null Set", 1, None) # Parent is None for the root node for tran_set in data_set: local_d = { item: header_table[item][0] for item in tran_set if item in freq_item_set } if local_d: sorted_items = sorted( local_d.items(), key=lambda item_info: item_info[1], reverse=True ) ordered_items = [item[0] for item in sorted_items] update_tree(ordered_items, fp_tree, header_table, 1) return fp_tree, header_table def update_tree(items: list, in_tree: TreeNode, header_table: dict, count: int) -> None: """ Update the FP-Tree with a transaction. Args: items: List of items in the transaction. in_tree: The current node in the FP-Tree. header_table: The header table dictionary with item information. count: The count of the transaction. Example: >>> data_set = [ ... ['A', 'B', 'C'], ... ['A', 'C'], ... ['A', 'B', 'E'], ... ['A', 'B', 'C', 'E'], ... ['B', 'E'] ... ] >>> min_sup = 2 >>> fp_tree, header_table = create_tree(data_set, min_sup) >>> fp_tree TreeNode('Null Set', 1, None) >>> transaction = ['A', 'B', 'E'] >>> update_tree(transaction, fp_tree, header_table, 1) >>> fp_tree TreeNode('Null Set', 1, None) >>> fp_tree.children['A'].children['B'].children['E'].children {} >>> fp_tree.children['A'].children['B'].children['E'].count 2 >>> header_table['E'][1].name 'E' """ if items[0] in in_tree.children: in_tree.children[items[0]].inc(count) else: in_tree.children[items[0]] = TreeNode(items[0], count, in_tree) if header_table[items[0]][1] is None: header_table[items[0]][1] = in_tree.children[items[0]] else: update_header(header_table[items[0]][1], in_tree.children[items[0]]) if len(items) > 1: update_tree(items[1:], in_tree.children[items[0]], header_table, count) def update_header(node_to_test: TreeNode, target_node: TreeNode) -> TreeNode: """ Update the header table with a node link. Args: node_to_test: The node to be updated in the header table. target_node: The node to link to. Example: >>> data_set = [ ... ['A', 'B', 'C'], ... ['A', 'C'], ... ['A', 'B', 'E'], ... ['A', 'B', 'C', 'E'], ... ['B', 'E'] ... ] >>> min_sup = 2 >>> fp_tree, header_table = create_tree(data_set, min_sup) >>> fp_tree TreeNode('Null Set', 1, None) >>> node1 = TreeNode("A", 3, None) >>> node2 = TreeNode("B", 4, None) >>> node1 TreeNode('A', 3, None) >>> node1 = update_header(node1, node2) >>> node1 TreeNode('A', 3, None) >>> node1.node_link TreeNode('B', 4, None) >>> node2.node_link is None True """ while node_to_test.node_link is not None: node_to_test = node_to_test.node_link if node_to_test.node_link is None: node_to_test.node_link = target_node # Return the updated node return node_to_test def ascend_tree(leaf_node: TreeNode, prefix_path: list[str]) -> None: """ Ascend the FP-Tree from a leaf node to its root, adding item names to the prefix path. Args: leaf_node: The leaf node to start ascending from. prefix_path: A list to store the item as they are ascended. Example: >>> data_set = [ ... ['A', 'B', 'C'], ... ['A', 'C'], ... ['A', 'B', 'E'], ... ['A', 'B', 'C', 'E'], ... ['B', 'E'] ... ] >>> min_sup = 2 >>> fp_tree, header_table = create_tree(data_set, min_sup) >>> path = [] >>> ascend_tree(fp_tree.children['A'], path) >>> path # ascending from a leaf node 'A' ['A'] """ if leaf_node.parent is not None: prefix_path.append(leaf_node.name) ascend_tree(leaf_node.parent, prefix_path) def find_prefix_path(base_pat: frozenset, tree_node: TreeNode | None) -> dict: # noqa: ARG001 """ Find the conditional pattern base for a given base pattern. Args: base_pat: The base pattern for which to find the conditional pattern base. tree_node: The node in the FP-Tree. Example: >>> data_set = [ ... ['A', 'B', 'C'], ... ['A', 'C'], ... ['A', 'B', 'E'], ... ['A', 'B', 'C', 'E'], ... ['B', 'E'] ... ] >>> min_sup = 2 >>> fp_tree, header_table = create_tree(data_set, min_sup) >>> fp_tree TreeNode('Null Set', 1, None) >>> len(header_table) 4 >>> base_pattern = frozenset(['A']) >>> sorted(find_prefix_path(base_pattern, fp_tree.children['A'])) [] """ cond_pats: dict = {} while tree_node is not None: prefix_path: list = [] ascend_tree(tree_node, prefix_path) if len(prefix_path) > 1: cond_pats[frozenset(prefix_path[1:])] = tree_node.count tree_node = tree_node.node_link return cond_pats def mine_tree( in_tree: TreeNode, # noqa: ARG001 header_table: dict, min_sup: int, pre_fix: set, freq_item_list: list, ) -> None: """ Mine the FP-Tree recursively to discover frequent itemsets. Args: in_tree: The FP-Tree to mine. header_table: The header table dictionary with item information. min_sup: The minimum support threshold. pre_fix: A set of items as a prefix for the itemsets being mined. freq_item_list: A list to store the frequent itemsets. Example: >>> data_set = [ ... ['A', 'B', 'C'], ... ['A', 'C'], ... ['A', 'B', 'E'], ... ['A', 'B', 'C', 'E'], ... ['B', 'E'] ... ] >>> min_sup = 2 >>> fp_tree, header_table = create_tree(data_set, min_sup) >>> fp_tree TreeNode('Null Set', 1, None) >>> frequent_itemsets = [] >>> mine_tree(fp_tree, header_table, min_sup, set([]), frequent_itemsets) >>> expe_itm = [{'C'}, {'C', 'A'}, {'E'}, {'A', 'E'}, {'E', 'B'}, {'A'}, {'B'}] >>> all(expected in frequent_itemsets for expected in expe_itm) True """ sorted_items = sorted(header_table.items(), key=lambda item_info: item_info[1][0]) big_l = [item[0] for item in sorted_items] for base_pat in big_l: new_freq_set = pre_fix.copy() new_freq_set.add(base_pat) freq_item_list.append(new_freq_set) cond_patt_bases = find_prefix_path(base_pat, header_table[base_pat][1]) my_cond_tree, my_head = create_tree(list(cond_patt_bases), min_sup) if my_head is not None: # Pass header_table[base_pat][1] as node_to_test to update_header header_table[base_pat][1] = update_header( header_table[base_pat][1], my_cond_tree ) mine_tree(my_cond_tree, my_head, min_sup, new_freq_set, freq_item_list) if __name__ == "__main__": from doctest import testmod testmod() data_set: list[frozenset] = [ frozenset(["bread", "milk", "cheese"]), frozenset(["bread", "milk"]), frozenset(["bread", "diapers"]), frozenset(["bread", "milk", "diapers"]), frozenset(["milk", "diapers"]), frozenset(["milk", "cheese"]), frozenset(["diapers", "cheese"]), frozenset(["bread", "milk", "cheese", "diapers"]), ] print(f"{len(data_set) = }") fp_tree, header_table = create_tree(data_set, min_sup=3) print(f"{fp_tree = }") print(f"{len(header_table) = }") freq_items: list = [] mine_tree(fp_tree, header_table, 3, set(), freq_items) print(f"{freq_items = }")