'''This module provides classes and functions to calculate the rhythmic partitions from a given digital score.'''
import copy
import music21
from fractions import Fraction
from tqdm import tqdm
from .lib.partition import Partition
from .lib.base import CustomException, EventLocation, GeneralSubparser, RPData, file_rename, find_nearest_smaller, make_fraction
[docs]def aux_make_events_from_part(m21_part: music21.stream.Part) -> dict:
'''Return a dictionary with Musical Events and their locations from a given
Music21 part object.
'''
measures = m21_part.getElementsByClass(music21.stream.Measure)
events = {}
for m21_measure in measures:
notes_and_rests = m21_measure.notesAndRests
offset = 0 # manual calculation
for m21_obj in notes_and_rests:
m_event = MusicalEvent()
offset = m_event.set_data_from_m21_obj(m21_obj, m21_measure.number, m21_measure.offset, offset)
events.update({
m_event.global_offset: m_event
})
return events
[docs]def aux_join_music_events(events: dict) -> dict:
'''Join `MusicalEvent`
This methods join adjacent tied objects as well as adjacent rests.
'''
# Add null event at the end
last_location = list(events.keys())[-1]
last_event = events[last_location]
last_location += last_event.duration + 1
current_event = MusicalEvent()
current_event.is_null = True
events.update({
last_location: MusicalEvent()
})
# Start with null
last_event = None
last_location = None
joined_events = {}
for location, current_event in events.items():
if current_event.is_null: # any - null
joined_events.update({last_location: last_event})
else:
if not last_event: # null - any
last_event = current_event
last_location = location
else:
if current_event.is_rest():
if last_event.is_rest(): # rest - rest
last_event.duration += current_event.duration
else: # note - rest
joined_events.update({last_location: last_event})
last_event = current_event
last_location = location
else:
if last_event.is_rest(): # rest - note
joined_events.update({last_location: last_event})
last_event = current_event
last_location = location
else: # note - note
if current_event.tie:
if current_event.tie == 'start': # note - note.start
joined_events.update({last_location: last_event})
last_event = current_event
last_location = location
else: # note - note.continue or note.stop
last_event.duration += current_event.duration
else:
joined_events.update({last_location: last_event})
last_event = current_event
last_location = location
return joined_events
[docs]def make_music_events_from_part(m21_part: music21.stream.Part) -> dict:
'''Return a dictionary with location and Musical Events from a given
Music21 part object. Adjacent rests and tied notes are joined.
'''
events = aux_make_events_from_part(m21_part)
return aux_join_music_events(events)
[docs]def auxiliary_get_duration(m21_obj) -> Fraction:
'''Return the duration of the given Music21 object as a Fraction object.'''
if m21_obj.duration.tuplets:
tup = m21_obj.duration.tuplets[0]
num = tup.numberNotesNormal
den = tup.numberNotesActual
dur = Fraction(num, den) / 2
else:
dur = make_fraction(m21_obj.duration.quarterLength)
return dur
[docs]def split_part_chords(m21_part: music21.stream.Part) -> music21.stream.Part:
'''Return a new Music21 Part object with pitches extracted from chords of a given Music21 Part object.
This function splits chords with notes of distinct durations.
'''
extra_part = music21.stream.Part()
measures = m21_part.getElementsByClass(music21.stream.Measure)
has_split_data = False
for measure in measures:
m = copy.deepcopy(measure)
m.elements = () # Remove original measure's elements
chords = measure.getElementsByClass(music21.chord.Chord)
for chord in chords:
ch_duration = chord.duration
ch_offset = chord.offset
ch_tie = chord.tie
if chord.tie:
# Add rests
if not m.notesAndRests.stream():
dur = chord.offset
rest = music21.note.Rest()
rest.duration = music21.duration.Duration(dur)
m.insert(0, rest)
old_obj_pitches = []
new_obj_pitches = []
new_obj_tie = None
for p in chord.pitches:
if chord.getTie(p) == chord.tie:
old_obj_pitches.append(p)
else:
new_obj_pitches.append(p)
new_obj_tie = chord.getTie(p)
if len(new_obj_pitches) > 0:
has_split_data = True
chord.pitches = tuple(old_obj_pitches)
# Handle old pitches
if len(old_obj_pitches) == 1:
active_site = chord.activeSite
ind = chord.activeSite.index(chord)
chord.activeSite.pop(ind)
obj = music21.note.Note(old_obj_pitches[0])
obj.duration = ch_duration
obj.offset = ch_offset
obj.tie = ch_tie
active_site.insert(obj.offset, obj)
# Handle new pitches
if len(new_obj_pitches) == 1:
note = music21.note.Note(new_obj_pitches[0])
note.duration = chord.duration
note.offset = chord.offset
m.insert(note.offset, note)
else:
new_chord = music21.chord.Chord()
new_chord.duration = chord.duration
new_chord.offset = chord.offset
new_chord.tie = new_obj_tie
new_chord_pitches = list(new_chord.pitches)
for p in new_obj_pitches:
new_chord_pitches.append(p)
new_chord.pitches = tuple(new_chord_pitches)
m.insert(new_chord.offset, new_chord)
extra_part.insert(m.offset, m)
if has_split_data:
ms = extra_part.getElementsByClass(music21.stream.Measure)
for m in ms:
original_m = m21_part.measure(m.number)
m.offset = original_m.offset
m.duration = original_m.duration
return extra_part
[docs]def split_score(filename: str) -> music21.stream.Score:
'''Parse a given digital score file, split chords, convert voices to parts and returns a new Music21 Score object.'''
parts = []
try:
sco = music21.converter.parse(filename)
except:
raise CustomException('Error on given score parsing.')
new_sco = music21.stream.Score()
print('Parsing the given score...')
for m21_part in tqdm(sco.parts):
for _part in m21_part.voicesToParts():
new_part = copy.deepcopy(_part)
extra = split_part_chords(new_part)
new_sco.insert(0, new_part)
parts.append(new_part)
if extra:
new_sco.insert(0, extra)
parts.append(extra)
return new_sco
[docs]def make_offset_map(m21part: music21.stream.Part) -> dict:
'''Create map with measure number and global offset value.'''
aux_offset_map = {}
for measure in m21part.getElementsByClass(music21.stream.Measure):
number = measure.number
measure_offset = make_fraction(measure.offset)
if measure_offset not in aux_offset_map.keys():
aux_offset_map.update({measure_offset: number})
# Recalculate measure numbers for score fixing.
i = 0
new_offset_map = {}
for k, v in aux_offset_map.items():
if v == 1:
i += 1
new_offset_map[i] = k
i += 1
return new_offset_map
[docs]class MusicalEvent(object):
'''Auxiliary musical event class.
This class has only the needed attributes of Music21's Note, Rest and Chord classes.
'''
def __init__(self, **kwargs):
self.measure_number = None
self.offset = Fraction(0)
self.global_offset = Fraction(0)
self.number_of_pitches = 0
self.duration = 0
self.tie = None
self.m21_class = None
self.is_null = False
if 'kwargs' in kwargs:
self.__dict__.update(kwargs['kwargs'])
def __eq__(self, __o: object) -> bool:
return self.__dict__ == __o.__dict__
def __str__(self) -> str:
return ' '.join(list(map(str, [self.number_of_pitches, self.duration, self.tie])))
def __repr__(self):
return '<E {}>'.format(self.__str__())
[docs] def is_rest(self):
'''Check if the current object represents a music21' rest object.'''
return self.m21_class == music21.note.Rest
[docs] def set_data_from_m21_obj(self, m21_obj, measure_number, measure_offset, offset=None):
'''Get data from given Music21 object, set as current object's attributes, and return offset and duration.'''
self.measure_number = measure_number
self.offset = offset
self.global_offset = self.offset + make_fraction(measure_offset)
self.duration = auxiliary_get_duration(m21_obj)
self.m21_class = m21_obj.__class__
if self.is_rest():
self.number_of_pitches = 0
else:
if m21_obj.isNote:
self.number_of_pitches = 1
else:
self.number_of_pitches = len(m21_obj.pitches)
if m21_obj.tie:
if m21_obj.tie.type in ['start', 'continue', 'stop']:
self.tie = m21_obj.tie.type
return offset + self.duration
[docs]class SingleEvent(object):
'''Auxiliary single event. It's more simple than Music21's note and rest objects and has useful attributes such ass number of pitches and sounding.'''
def __init__(self, **kwargs):
self.number_of_pitches = 0
self.duration = Fraction(0)
self.measure_number = 0
self.offset = Fraction(0)
self.sounding = False
if 'kwargs' in kwargs:
self.__dict__.update(kwargs['kwargs'])
def __eq__(self, __o: object) -> bool:
return self.__dict__ == __o.__dict__
def __repr__(self) -> str:
event_location = EventLocation(measure_number=self.measure_number, offset=self.offset)
ind = event_location.str_index
return '<SE ({}) num={} dur={} snd={}>'.format(ind, self.number_of_pitches, self.duration, self.sounding)
[docs]class Parsema(object):
'''Auxiliary Parsema class.
Parsema is the set of adjacent equal partitions. See Gentil-Nunes 2009 for further information.'''
def __init__(self, **kwargs):
self.measure_number = None
self.offset = None
self.global_offset = None
self.duration = 0
self.single_events = []
self.partition = None
if 'kwargs' in kwargs:
self.__dict__.update(kwargs['kwargs'])
def __eq__(self, __o: object) -> bool:
return self.__dict__ == __o.__dict__
def __repr__(self) -> str:
partition_str = ''
if self.partition:
partition_str = self.partition.as_string()
event_loc = EventLocation(measure_number=self.measure_number, offset=self.offset)
return '<Pma: {} ({}), dur {}>'.format(partition_str, event_loc, self.duration)
[docs] def add_single_events(self, single_events: list) -> None:
'''Set single events, calculate their durations and set partition.'''
self.single_events = single_events
durations = [event.duration for event in single_events if event]
if durations:
self.duration = min(durations)
self.set_partition()
[docs] def set_partition(self) -> None:
'''Create `Partition` object from single events attribute and set it to `partition` attribute.'''
parts = {}
number_of_pitches_set = set([
s_event.number_of_pitches
for s_event in self.single_events
])
if list(number_of_pitches_set) == [0]:
self.partition = Partition([])
return
for s_event in self.single_events:
key = (s_event.sounding, s_event.duration)
if key not in parts.keys() and s_event.number_of_pitches > 0:
parts[key] = 0
if s_event.number_of_pitches > 0:
parts[key] += s_event.number_of_pitches
self.partition = Partition(sorted(parts.values()))
[docs]class PartSoundingMap(object):
'''Sounding Map class of a musical part.'''
def __init__(self, **kwargs) -> None:
self.single_events = None
self.attack_global_offsets = []
if 'kwargs' in kwargs:
self.__dict__.update(kwargs['kwargs'])
def __eq__(self, __o: object) -> bool:
return self.__dict__ == __o.__dict__
def __str__(self) -> int:
return len(self.single_events.keys())
def __repr__(self) -> str:
return '<PSM: {} events>'.format(self.__str__())
[docs] def set_from_m21_part(self, m21_part: music21.stream.Part) -> None:
'''Set Music21's part elements into `single_events` attribute.
Create `MusicEvent` objects for each event and then, create `SingleEvent` objects to add to `single_events` attribute.'''
music_events = make_music_events_from_part(m21_part)
self.single_events = {}
for global_offset, m_event in music_events.items():
# interval: closed start and open end.
closed_beginning = global_offset
open_ending = closed_beginning + m_event.duration
single_event = SingleEvent()
single_event.number_of_pitches = m_event.number_of_pitches
single_event.duration = m_event.duration
single_event.measure_number = m_event.measure_number
single_event.offset = m_event.offset
self.single_events.update({
(closed_beginning, open_ending): single_event
})
self.attack_global_offsets.append(closed_beginning)
[docs] def get_single_event_by_location(self, global_offset: Fraction) -> SingleEvent:
'''Return a `SingleEvent` object from its location.'''
beginning = find_nearest_smaller(global_offset, self.attack_global_offsets)
if beginning == -1: # No event to return
return
ind = self.attack_global_offsets.index(beginning)
_, ending = list(self.single_events.keys())[ind]
s_event = None
if global_offset >= beginning and global_offset < ending:
s_event = copy.deepcopy(self.single_events[(beginning, ending)])
duration_diff = global_offset - beginning
duration = s_event.duration
duration = duration - duration_diff
sounding = duration_diff > 0
s_event.duration = duration
if s_event.number_of_pitches > 0:
s_event.sounding = sounding
else:
s_event.sounding = False
return s_event
[docs]class ScoreSoundingMap(object):
'''Sounding Map class of a musical score. Individual parts are sounding maps.'''
def __init__(self, **kwargs) -> None:
self.sounding_maps = []
self.attacks = []
self.measure_offsets = {}
if 'kwargs' in kwargs:
self.__dict__.update(kwargs['kwargs'])
def __eq__(self, __o: object) -> bool:
return self.__dict__ == __o.__dict__
def __repr__(self) -> str:
return '<SSM: {} maps, {} attacks>'.format(len(self.sounding_maps), len(self.attacks))
[docs] def add_part_sounding_map(self, m21_part: music21.stream.Part) -> None:
'''Creates a `PartSoundingMap` from a given Music21 Part and add it to sounding_maps and attacks attributes.'''
psm = PartSoundingMap()
psm.set_from_m21_part(m21_part)
if psm.single_events:
self.sounding_maps.append(psm)
self.attacks.extend(psm.attack_global_offsets)
self.attacks = sorted(set(self.attacks))
[docs] def add_score_sounding_maps(self, m21_score: music21.stream.Score) -> None:
'''Create `PartSoundingMap` objects from each part of a given Music21 Score.
This method also get measure offsets and explodes voices into parts.'''
# Get and fill measure offsets
self.measure_offsets = make_offset_map(m21_score.parts[0])
# Get and fill sounding parts
print('Getting and filling sounding parts...')
for m21_part in tqdm(m21_score.parts):
for _part in m21_part.voicesToParts():
self.add_part_sounding_map(copy.deepcopy(_part))
[docs] def get_single_events_by_location(self, global_offset: Fraction) -> list:
'''Return a list of `SingleEvent` objects in different sounding_maps from their locations.'''
single_events = []
for sounding_map in self.sounding_maps:
s_event = sounding_map.get_single_event_by_location(global_offset)
if s_event:
single_events.append(s_event)
return single_events
[docs] def make_parsemae(self) -> list:
'''Return a list of `Parsema` objects from `attacks` attribute.
This method also handles merged parsemae.'''
parsemae = []
offset_map = {ofs: ms for ms, ofs in self.measure_offsets.items()}
all_offsets = list(offset_map.keys())
for attack in self.attacks:
measure_offset = find_nearest_smaller(attack, all_offsets)
measure_number = offset_map[measure_offset]
offset = make_fraction(attack) - make_fraction(measure_offset)
parsema = Parsema()
parsema.add_single_events(self.get_single_events_by_location(attack))
parsema.global_offset = attack
parsema.measure_number = measure_number
parsema.offset = offset
parsemae.append(parsema)
if not parsemae:
return
# Merge parsemae
merged_parsemae = []
first_parsema = parsemae[0]
for parsema in parsemae[1:]:
if parsema.partition.parts == first_parsema.partition.parts:
first_parsema.duration += parsema.duration
else:
merged_parsemae.append(first_parsema)
first_parsema = parsema
merged_parsemae.append(first_parsema)
return merged_parsemae
[docs]class ParsemaeSegment(object):
'''Parsema segment class.'''
def __init__(self, **kwargs) -> None:
self.parsemae = []
self._measure_offsets = {}
if 'kwargs' in kwargs:
self.__dict__.update(kwargs['kwargs'])
def __eq__(self, __o: object) -> bool:
return self.__dict__ == __o.__dict__
def __repr__(self) -> str:
return '<PS: {} parsemae>'.format(len(self.parsemae))
[docs] def make_from_music21_score(self, m21_score: music21.stream.Score) -> None:
'''Create `Parsema` objects fom given Music21 Score object and store at `parsemae` class attribute.'''
ssm = ScoreSoundingMap()
ssm.add_score_sounding_maps(m21_score)
self.parsemae = ssm.make_parsemae()
self._measure_offsets = ssm.measure_offsets
[docs] def get_data(self) -> tuple:
'''Get partitions, agglomeration, and dispersion data and their locations.'''
data = {
'Index': [], # 0
'Measure number': [], # 1
'Offset': [], # 2
'Global offset': [], # 3
'Duration': [], # 4
'Partition': [], # 5
'Density-number': [], # 6
'Agglomeration': [], # 7
'Dispersion': [], # 8
}
values_map = {}
for parsema in self.parsemae:
event_location = EventLocation(measure_number = parsema.measure_number, offset=parsema.offset)
partition = parsema.partition
partition_str = partition.as_string()
agglomeration = partition.get_agglomeration_index()
dispersion = partition.get_dispersion_index()
data['Index'].append(event_location.str_index)
data['Measure number'].append(parsema.measure_number)
data['Offset'].append(parsema.offset)
data['Global offset'].append(parsema.global_offset)
data['Duration'].append(parsema.duration)
# data['Offset'].append(fraction_to_string(parsema.offset))
# data['Global offset'].append(fraction_to_string(parsema.global_offset))
# data['Duration'].append(fraction_to_string(parsema.duration))
data['Partition'].append(partition_str)
data['Density-number'].append(partition.get_density_number())
data['Agglomeration'].append(agglomeration)
data['Dispersion'].append(dispersion)
if partition_str not in values_map.keys():
values_map[partition_str] = (agglomeration, dispersion)
return data, values_map
[docs] def make_rpdata(self, filename: str) -> RPData:
'''Return `RPData` object from `parsemae` class attribute.'''
rpdata = RPData()
rpdata.path = file_rename(filename, 'json')
offset_map_orig = self._measure_offsets # values as fractions
offset_map_conv = {mn: offset for mn, offset in offset_map_orig.items()} # values as strings
# offset_map_conv = {mn: fraction_to_string(offset) for mn, offset in offset_map_orig.items()} # values as strings
rpdata.data, rpdata.values_map = self.get_data()
rpdata.partitions = rpdata.data['Partition']
rpdata.size = len(rpdata.partitions)
rpdata.offset_map = offset_map_conv
return rpdata
[docs]class Subparser(GeneralSubparser):
'''Implements argparser.'''
[docs] def setup(self) -> None:
self.program_name = 'calc'
self.program_help = 'Calculator'
self.add_parent = False
[docs] def add_arguments(self) -> None:
self.parser.add_argument('filename', help='digital score filename (XML, MXL, MIDI and KRN)', type=str)
self.parser.add_argument('-c', '--csv', help='output data in a CSV file.', default=False, action='store_true')
self.parser.add_argument('-e', '--equally_sized', help='generate equally-sized events', default=False, action='store_true')
[docs] def handle(self, args):
print('Running script on {} file...'.format(args.filename))
sco = split_score(args.filename)
segment = ParsemaeSegment()
segment.make_from_music21_score(sco)
rpdata = segment.make_rpdata(args.filename)
rpdata.save_to_file()
if args.csv:
rpdata.save_to_csv(args.equally_sized)