#!/usr/bin/env python3
# -*- coding:utf-8 -*-
"""
This module provides built-in co-reference detection driver.
"""
__author__ = 'Mu Yang <http://muyang.pro>'
__copyright__ = '2018-2020 CKIP Lab'
__license__ = 'CC BY-NC-SA 4.0'
import numpy as _np
from treelib import (
Tree as _Tree,
)
from ckipnlp.container import (
TextParagraph as _TextParagraph,
SegParagraph as _SegParagraph,
ParsedParagraph as _ParsedParagraph,
ParsedTree as _ParsedTree,
NerParagraph as _NerParagraph,
CorefToken as _CorefToken,
CorefSentence as _CorefSentence,
CorefParagraph as _CorefParagraph,
)
from ckipnlp.data.parsed import (
APPOSITION_ROLES as _APPOSITION_ROLES,
)
from ckipnlp.data.coref import (
HUMAN_WORDS as _HUMAN_WORDS,
PRONOUN_3RD_WORDS as _PRONOUN_3RD_WORDS,
SELF_WORDS as _SELF_WORDS,
)
from .base import (
BaseDriver as _BaseDriver,
DriverType as _DriverType,
DriverKind as _DriverKind,
)
################################################################################################################################
[docs]class CkipCorefChunker(_BaseDriver): # pylint: disable=too-few-public-methods
"""The CKIP co-reference detection driver."""
driver_type = _DriverType.COREF_CHUNKER
driver_kind = _DriverKind.BUILTIN
def _call(self, *, parsed):
assert isinstance(parsed, _ParsedParagraph)
# Convert to tree structure
tree_list = list(map(_ParsedTree.from_text, parsed))
# Find co-reference
coref_tree = self._get_coref(tree_list)
# Get results
coref = self._get_result(tree_list, coref_tree=coref_tree)
return coref
def _init(self):
pass
@classmethod
def _get_coref(cls, tree_list):
coref_tree = _Tree()
coref_tree.create_node(identifier=0)
dummy_id = (-1, -1)
coref_tree.create_node(identifier=dummy_id, parent=0, data=True)
name2node = {} # name => (tree_id, node_id)
curr_source = None # the current coref source
last_source = None # the last coref source
last_subject = None # the last coref subject
# Find coref
for tree_id, tree in enumerate(tree_list):
# Get relations
appositions = []
for rel in tree.get_relations():
if rel.relation.data.role in _APPOSITION_ROLES:
appositions.append((rel.head.identifier, rel.tail.identifier,))
# Get sources/targets
node_ids = {}
for nid in cls._get_sources(tree): # Source
node_ids[nid] = True
for nid in cls._get_targets(tree): # Target
node_ids[nid] = False
subject_ids = set(cls._get_subjects(tree)) # Subject
for nid, ntype in sorted(node_ids.items()):
if ntype: # Assign ref_id to sources
source = tree[nid]
curr_source = (tree_id, nid,)
parent_id = name2node.get(source.data.word, None)
if parent_id:
coref_tree.create_node(identifier=(tree_id, nid,), parent=parent_id, data=True)
else:
name2node[source.data.word] = (tree_id, nid,)
coref_tree.create_node(identifier=(tree_id, nid,), parent=0, data=True)
else: # Link targets to previous sources
if nid < 0 and last_subject:
coref_tree.create_node(identifier=(tree_id, nid,), parent=last_subject, data=False)
if nid >= 0:
if curr_source and tree[nid].data.word in _SELF_WORDS:
coref_tree.create_node(identifier=(tree_id, nid,), parent=curr_source, data=False)
elif last_source:
coref_tree.create_node(identifier=(tree_id, nid,), parent=last_source, data=False)
else:
coref_tree.create_node(identifier=(tree_id, nid,), parent=dummy_id, data=False)
# Merge apposition
for head_id, tail_id in appositions:
head_id = (tree_id, head_id,)
tail_id = (tree_id, tail_id,)
if coref_tree.contains(head_id) and coref_tree.contains(tail_id):
if coref_tree.is_ancestor(head_id, tail_id) or \
coref_tree.is_ancestor(tail_id, head_id):
continue
if coref_tree[head_id].data: # Head is a source
coref_tree.move_node(tail_id, head_id)
elif coref_tree[tail_id].data: # Tail is a source
coref_tree.move_node(head_id, tail_id)
else:
coref_tree.move_node(tail_id, head_id)
# Update subject
for nid, ntype in sorted(node_ids.items(), key=lambda x: x[::-1]):
if nid in subject_ids:
last_subject = (tree_id, nid,)
break
# Update source
last_source = curr_source
# Remove dummy node
coref_tree.remove_node(dummy_id)
return coref_tree
@classmethod
def _get_result(cls, tree_list, *, coref_tree):
# Assign coref ID
node2coref = {} # (tree_id, node_id) => ref_id
coref2node = {} # ref_id => node
for ref_id, coref_source in enumerate(coref_tree.children(coref_tree.root)):
tree_id, node_id = coref_source.identifier
coref2node[ref_id] = tree_list[tree_id][node_id]
for tree_id, node_id in coref_tree.expand_tree(coref_source.identifier):
node2coref[tree_id, node_id] = ref_id
# Generate result
tokens_list = _CorefParagraph()
for tree_id, tree in enumerate(tree_list):
tokens = _CorefSentence()
tokens_list.append(tokens)
nodes = tree.leaves()
if (tree_id, -1) in node2coref:
ref_id = node2coref[tree_id, -1]
tokens.append(_CorefToken( # pylint: disable=no-value-for-parameter
word=None,
idx=None,
coref=(ref_id, 'zero'),
))
elif (tree_id, -2) in node2coref:
# The pos of the first leaf node starts with 'Cb'. e.g. 而且、但是、然而
node = nodes.pop(0)
tokens.append(_CorefToken( # pylint: disable=no-value-for-parameter
word=node.data.word,
idx=node.identifier,
coref=None,
))
ref_id = node2coref[tree_id, -2]
tokens.append(_CorefToken( # pylint: disable=no-value-for-parameter
word=None,
idx=None,
coref=(ref_id, 'zero'),
))
for node in nodes:
ref_id = node2coref.get((tree_id, node.identifier,), -1)
if ref_id >= 0:
ref_node = coref2node[ref_id]
tokens.append(_CorefToken( # pylint: disable=no-value-for-parameter
word=node.data.word,
idx=node.identifier,
coref=(ref_id, 'source' if node.identifier == ref_node.identifier else 'target',),
))
else:
tokens.append(_CorefToken( # pylint: disable=no-value-for-parameter
word=node.data.word,
idx=node.identifier,
coref=None,
))
return tokens_list
########################################################################################################################
########################################################################################################################
@classmethod
def _get_sources(cls, tree):
"""Get sources of a tree
Parameters
----------
tree : :class:`ParsedTree <ckipnlp.container.util.parsed_tree.ParsedTree>`
the parser tree.
Yields
------
int
the identifier of source nodes.
Notes
-----
A node can be a co-reference source if either:
1. POS-tag is `Nb`
2. is one of the human words from E-HowNet
"""
for node in tree.leaves():
if cls._is_human_word(node):
yield node.identifier
@classmethod
def _get_targets(cls, tree):
"""Get targets of a tree
Parameters
----------
tree : :class:`ParsedTree <ckipnlp.container.util.parsed_tree.ParsedTree>`
the parser tree.
Yields
------
int
the identifier of target nodes.
-2
if the tree is not VP and pos of the first leaf node starts with 'Cb'
-1
if the tree is not VP
Notes
-----
A node can be a co-reference target if either:
1. POS-tag is `Nh`
2. is one of the pronoun words from E-HowNet
"""
root = tree[tree.root]
leaves = tree.leaves()
if root.data.pos == 'VP':
if leaves[0].data.pos.startswith('Cb'):
yield -2 # coref will be inserted after this Cb node
else:
yield -1 # coref will be inserted in front of the whole sentence
for node in leaves:
if cls._is_pronoun_word(node):
yield node.identifier
########################################################################################################################
@classmethod
def _get_subjects(cls, tree):
"""Get subjects of a tree
Parameters
----------
tree : :class:`ParsedTree <ckipnlp.container.util.parsed_tree.ParsedTree>`
the parser tree.
Yields
------
int
the identifier of subject node.
"""
for node in tree.get_subjects():
yield node.identifier
########################################################################################################################
@staticmethod
def _is_human_word(node):
return node.data.pos.startswith('Nb') or node.data.pos.startswith('N') and node.data.word in _HUMAN_WORDS
@staticmethod
def _is_pronoun_word(node):
return node.data.pos.startswith('Nh') or node.data.pos.startswith('N') and node.data.word in _PRONOUN_3RD_WORDS
@staticmethod
def _getitem_deep(obj, idx0, idx1):
return obj[idx0][idx1]
@staticmethod
def _setitem_deep(obj, idx0, idx1, value):
obj[idx0][idx1] = value
########################################################################################################################