'''This module provides classes and functions to calculate the rhythmic partitions from a given digital score.'''
import copy
import multiprocessing
import os
import music21
from fractions import Fraction
from tqdm import tqdm
from .lib.partition import Partition
from .lib.base import CustomException, EventLocation, GeneralSubparser, RPData, file_rename, find_nearest_smaller, make_fraction
SCORE_FILETYPES = [
'xml',
'mxl',
'krn',
'midi',
'mid',
]
[docs]
def aux_make_events_from_part(m21_part: music21.stream.Part) -> dict:
'''Return a dictionary with Musical Events and their locations from a given
Music21 part object.
'''
measures = m21_part.getElementsByClass(music21.stream.Measure)
events = {}
for m21_measure in measures:
notes_and_rests = m21_measure.notesAndRests
offset = 0 # manual calculation
for m21_obj in notes_and_rests:
m_event = MusicalEvent()
offset = m_event.set_data_from_m21_obj(m21_obj, m21_measure.number, m21_measure.offset, offset)
events.update({
m_event.global_offset: m_event
})
return events
[docs]
def aux_join_music_events(events: dict) -> dict:
'''Join `MusicalEvent`
This methods join adjacent tied objects as well as adjacent rests.
'''
# Add null event at the end
last_location = list(events.keys())[-1]
last_event = events[last_location]
last_location += last_event.duration + 1
current_event = MusicalEvent()
current_event.is_null = True
events.update({
last_location: MusicalEvent()
})
# Start with null
last_event = None
last_location = None
joined_events = {}
for location, current_event in events.items():
if current_event.is_null: # any - null
joined_events.update({last_location: last_event})
else:
if not last_event: # null - any
last_event = current_event
last_location = location
else:
if current_event.is_rest():
if last_event.is_rest(): # rest - rest
last_event.duration += current_event.duration
else: # note - rest
joined_events.update({last_location: last_event})
last_event = current_event
last_location = location
else:
if last_event.is_rest(): # rest - note
joined_events.update({last_location: last_event})
last_event = current_event
last_location = location
else: # note - note
if current_event.tie:
if current_event.tie == 'start': # note - note.start
joined_events.update({last_location: last_event})
last_event = current_event
last_location = location
else: # note - note.continue or note.stop
last_event.duration += current_event.duration
else:
joined_events.update({last_location: last_event})
last_event = current_event
last_location = location
return joined_events
[docs]
def make_music_events_from_part(m21_part: music21.stream.Part) -> dict:
'''Return a dictionary with location and Musical Events from a given
Music21 part object. Adjacent rests and tied notes are joined.
'''
events = aux_make_events_from_part(m21_part)
return aux_join_music_events(events)
[docs]
def split_part_chords(m21_part: music21.stream.Part) -> music21.stream.Part:
'''Return a new Music21 Part object with pitches extracted from chords of a given Music21 Part object.
This function splits chords with notes of distinct durations.
'''
extra_part = music21.stream.Part()
measures = m21_part.getElementsByClass(music21.stream.Measure)
has_split_data = False
for measure in measures:
m = copy.deepcopy(measure)
m.elements = () # Remove original measure's elements
chords = measure.getElementsByClass(music21.chord.Chord)
for chord in chords:
ch_duration = chord.duration
ch_offset = chord.offset
ch_tie = chord.tie
if chord.tie:
# Add rests
if not m.notesAndRests.stream():
dur = chord.offset
rest = music21.note.Rest()
rest.duration = music21.duration.Duration(dur)
m.insert(0, rest)
old_obj_pitches = []
new_obj_pitches = []
new_obj_tie = None
for p in chord.pitches:
if chord.getTie(p) == chord.tie:
old_obj_pitches.append(p)
else:
new_obj_pitches.append(p)
new_obj_tie = chord.getTie(p)
if len(new_obj_pitches) > 0:
has_split_data = True
chord.pitches = tuple(old_obj_pitches)
# Handle old pitches
if len(old_obj_pitches) == 1:
active_site = chord.activeSite
ind = chord.activeSite.index(chord)
chord.activeSite.pop(ind)
obj = music21.note.Note(old_obj_pitches[0])
obj.duration = ch_duration
obj.offset = ch_offset
obj.tie = ch_tie
active_site.insert(obj.offset, obj)
# Handle new pitches
if len(new_obj_pitches) == 1:
note = music21.note.Note(new_obj_pitches[0])
note.duration = chord.duration
note.offset = chord.offset
m.insert(note.offset, note)
else:
new_chord = music21.chord.Chord()
new_chord.duration = chord.duration
new_chord.offset = chord.offset
new_chord.tie = new_obj_tie
new_chord_pitches = list(new_chord.pitches)
for p in new_obj_pitches:
new_chord_pitches.append(p)
new_chord.pitches = tuple(new_chord_pitches)
m.insert(new_chord.offset, new_chord)
extra_part.insert(m.offset, m)
if has_split_data:
ms = extra_part.getElementsByClass(music21.stream.Measure)
for m in ms:
original_m = m21_part.measure(m.number)
m.offset = original_m.offset
m.duration = original_m.duration
return extra_part
[docs]
def split_score(filename: str) -> music21.stream.Score:
'''Parse a given digital score file, split chords, convert voices to parts and returns a new Music21 Score object.'''
parts = []
try:
sco = music21.converter.parse(filename, quantizePost=False)
except:
raise CustomException('Error on given score parsing {}.'.format(filename))
new_sco = music21.stream.Score()
print('Parsing the given score...')
for m21_part in tqdm(sco.parts):
for _part in m21_part.voicesToParts():
new_part = copy.deepcopy(_part)
extra = split_part_chords(new_part)
new_sco.insert(0, new_part)
parts.append(new_part)
if extra:
new_sco.insert(0, extra)
parts.append(extra)
return new_sco
[docs]
def make_offset_map(m21part: music21.stream.Part) -> dict:
'''Create map with measure number and global offset value.'''
aux_offset_map = {}
for measure in m21part.getElementsByClass(music21.stream.Measure):
number = measure.number
measure_offset = make_fraction(measure.offset)
if measure_offset not in aux_offset_map.keys():
aux_offset_map.update({measure_offset: number})
# Recalculate measure numbers for score fixing in ritornello cases
new_offset_map = {}
if 0 in aux_offset_map.values():
measure_number = 0
else:
measure_number = 1
for global_offset in aux_offset_map.keys():
new_offset_map[measure_number] = global_offset
measure_number += 1
return new_offset_map
[docs]
class MusicalEvent(object):
'''Auxiliary musical event class.
This class has only the needed attributes of Music21's Note, Rest and Chord classes.
'''
def __init__(self, **kwargs):
self.measure_number = None
self.offset = Fraction(0)
self.global_offset = Fraction(0)
self.number_of_pitches = 0
self.duration = 0
self.tie = None
self.m21_class = None
self.is_null = False
if 'kwargs' in kwargs:
self.__dict__.update(kwargs['kwargs'])
def __eq__(self, __o: object) -> bool:
return self.__dict__ == __o.__dict__
def __str__(self) -> str:
return ' '.join(list(map(str, [self.number_of_pitches, self.duration, self.tie])))
def __repr__(self):
return '<E {}>'.format(self.__str__())
[docs]
def is_rest(self):
'''Check if the current object represents a music21' rest object.'''
return self.m21_class == music21.note.Rest
[docs]
def set_data_from_m21_obj(self, m21_obj, measure_number, measure_offset, offset=None):
'''Get data from given Music21 object, set as current object's attributes, and return offset and duration.'''
self.measure_number = measure_number
self.offset = offset
self.global_offset = self.offset + make_fraction(measure_offset)
self.duration = make_fraction(m21_obj.duration.quarterLength)
self.m21_class = m21_obj.__class__
if self.is_rest():
self.number_of_pitches = 0
else:
if m21_obj.isNote:
self.number_of_pitches = 1
else:
self.number_of_pitches = len(m21_obj.pitches)
if m21_obj.tie:
if m21_obj.tie.type in ['start', 'continue', 'stop']:
self.tie = m21_obj.tie.type
return offset + self.duration
[docs]
class SingleEvent(object):
'''Auxiliary single event. It's more simple than Music21's note and rest objects and has useful attributes such ass number of pitches and sounding.'''
def __init__(self, **kwargs):
self.number_of_pitches = 0
self.duration = Fraction(0)
self.measure_number = 0
self.offset = Fraction(0)
self.sounding = False
if 'kwargs' in kwargs:
self.__dict__.update(kwargs['kwargs'])
def __eq__(self, __o: object) -> bool:
return self.__dict__ == __o.__dict__
def __repr__(self) -> str:
event_location = EventLocation(measure_number=self.measure_number, offset=self.offset)
ind = event_location.str_index
return '<SE ({}) num={} dur={} snd={}>'.format(ind, self.number_of_pitches, self.duration, self.sounding)
[docs]
class Parsema(object):
'''Auxiliary Parsema class.
Parsema is the set of adjacent equal partitions. See Gentil-Nunes 2009 for further information.'''
def __init__(self, **kwargs):
self.measure_number = None
self.offset = None
self.global_offset = None
self.duration = 0
self.single_events = []
self.partition = None
if 'kwargs' in kwargs:
self.__dict__.update(kwargs['kwargs'])
def __eq__(self, __o: object) -> bool:
return self.__dict__ == __o.__dict__
def __repr__(self) -> str:
partition_str = ''
if self.partition:
partition_str = self.partition.as_string()
event_loc = EventLocation(measure_number=self.measure_number, offset=self.offset)
return '<Pma: {} ({}), dur {}>'.format(partition_str, event_loc, self.duration)
[docs]
def add_single_events(self, single_events: list) -> None:
'''Set single events, calculate their durations and set partition.'''
self.single_events = single_events
durations = [event.duration for event in single_events if event]
if durations:
self.duration = min(durations)
self.set_partition()
[docs]
def set_partition(self) -> None:
'''Create `Partition` object from single events attribute and set it to `partition` attribute.'''
parts = {}
number_of_pitches_set = set([
s_event.number_of_pitches
for s_event in self.single_events
])
if list(number_of_pitches_set) == [0]:
self.partition = Partition([])
return
for s_event in self.single_events:
key = (s_event.sounding, s_event.duration)
if key not in parts.keys() and s_event.number_of_pitches > 0:
parts[key] = 0
if s_event.number_of_pitches > 0:
parts[key] += s_event.number_of_pitches
self.partition = Partition(sorted(parts.values()))
[docs]
class PartSoundingMap(object):
'''Sounding Map class of a musical part.'''
def __init__(self, **kwargs) -> None:
self.single_events = None
self.attack_global_offsets = []
if 'kwargs' in kwargs:
self.__dict__.update(kwargs['kwargs'])
def __eq__(self, __o: object) -> bool:
return self.__dict__ == __o.__dict__
def __str__(self) -> int:
return len(self.single_events.keys())
def __repr__(self) -> str:
return '<PSM: {} events>'.format(self.__str__())
[docs]
def set_from_m21_part(self, m21_part: music21.stream.Part) -> None:
'''Set Music21's part elements into `single_events` attribute.
Create `MusicEvent` objects for each event and then, create `SingleEvent` objects to add to `single_events` attribute.'''
music_events = make_music_events_from_part(m21_part)
self.single_events = {}
for global_offset, m_event in music_events.items():
# interval: closed start and open end.
closed_beginning = global_offset
open_ending = closed_beginning + m_event.duration
single_event = SingleEvent()
single_event.number_of_pitches = m_event.number_of_pitches
single_event.duration = m_event.duration
single_event.measure_number = m_event.measure_number
single_event.offset = m_event.offset
self.single_events.update({
(closed_beginning, open_ending): single_event
})
self.attack_global_offsets.append(closed_beginning)
[docs]
def get_single_event_by_location(self, global_offset: Fraction) -> SingleEvent:
'''Return a `SingleEvent` object from its location.'''
beginning = find_nearest_smaller(global_offset, self.attack_global_offsets)
if beginning == -1: # No event to return
return
ind = self.attack_global_offsets.index(beginning)
_, ending = list(self.single_events.keys())[ind]
s_event = None
if global_offset >= beginning and global_offset < ending:
s_event = copy.deepcopy(self.single_events[(beginning, ending)])
duration_diff = global_offset - beginning
duration = s_event.duration
duration = duration - duration_diff
sounding = duration_diff > 0
s_event.duration = duration
if s_event.number_of_pitches > 0:
s_event.sounding = sounding
else:
s_event.sounding = False
return s_event
[docs]
class ScoreSoundingMap(object):
'''Sounding Map class of a musical score. Individual parts are sounding maps.'''
def __init__(self, **kwargs) -> None:
self.sounding_maps = []
self.attacks = []
self.measure_offsets = {}
if 'kwargs' in kwargs:
self.__dict__.update(kwargs['kwargs'])
def __eq__(self, __o: object) -> bool:
return self.__dict__ == __o.__dict__
def __repr__(self) -> str:
return '<SSM: {} maps, {} attacks>'.format(len(self.sounding_maps), len(self.attacks))
[docs]
def add_part_sounding_map(self, m21_part: music21.stream.Part) -> None:
'''Creates a `PartSoundingMap` from a given Music21 Part and add it to sounding_maps and attacks attributes.'''
psm = PartSoundingMap()
psm.set_from_m21_part(m21_part)
if psm.single_events:
self.sounding_maps.append(psm)
self.attacks.extend(psm.attack_global_offsets)
self.attacks = sorted(set(self.attacks))
[docs]
def add_score_sounding_maps(self, m21_score: music21.stream.Score) -> None:
'''Create `PartSoundingMap` objects from each part of a given Music21 Score.
This method also get measure offsets and explodes voices into parts.'''
# Get and fill measure offsets
self.measure_offsets = make_offset_map(m21_score.parts[0])
# Get and fill sounding parts
print('Getting and filling sounding parts...')
for m21_part in tqdm(m21_score.parts):
for _part in m21_part.voicesToParts():
self.add_part_sounding_map(copy.deepcopy(_part))
[docs]
def get_single_events_by_location(self, global_offset: Fraction) -> list:
'''Return a list of `SingleEvent` objects in different sounding_maps from their locations.'''
single_events = []
for sounding_map in self.sounding_maps:
s_event = sounding_map.get_single_event_by_location(global_offset)
if s_event:
single_events.append(s_event)
return single_events
[docs]
def make_parsemae(self) -> list:
'''Return a list of `Parsema` objects from `attacks` attribute.
This method also handles merged parsemae.'''
parsemae = []
offset_map = {ofs: ms for ms, ofs in self.measure_offsets.items()}
all_offsets = list(offset_map.keys())
for attack in self.attacks:
measure_offset = find_nearest_smaller(attack, all_offsets)
measure_number = offset_map[measure_offset]
offset = make_fraction(attack) - make_fraction(measure_offset)
parsema = Parsema()
parsema.add_single_events(self.get_single_events_by_location(attack))
parsema.global_offset = attack
parsema.measure_number = measure_number
parsema.offset = offset
parsemae.append(parsema)
if not parsemae:
return
# Merge parsemae
merged_parsemae = []
first_parsema = parsemae[0]
for parsema in parsemae[1:]:
if parsema.partition.parts == first_parsema.partition.parts:
first_parsema.duration += parsema.duration
else:
merged_parsemae.append(first_parsema)
first_parsema = parsema
merged_parsemae.append(first_parsema)
return merged_parsemae
[docs]
class ParsemaeSegment(object):
'''Parsema segment class.'''
def __init__(self, **kwargs) -> None:
self.parsemae = []
self._measure_offsets = {}
if 'kwargs' in kwargs:
self.__dict__.update(kwargs['kwargs'])
def __eq__(self, __o: object) -> bool:
return self.__dict__ == __o.__dict__
def __repr__(self) -> str:
return '<PS: {} parsemae>'.format(len(self.parsemae))
[docs]
def make_from_music21_score(self, m21_score: music21.stream.Score) -> None:
'''Create `Parsema` objects fom given Music21 Score object and store at `parsemae` class attribute.'''
ssm = ScoreSoundingMap()
ssm.add_score_sounding_maps(m21_score)
self.parsemae = ssm.make_parsemae()
self._measure_offsets = ssm.measure_offsets
[docs]
def get_data(self) -> tuple:
'''Get partitions, agglomeration, and dispersion data and their locations.'''
data = {
'Index': [], # 0
'Measure number': [], # 1
'Offset': [], # 2
'Global offset': [], # 3
'Duration': [], # 4
'Partition': [], # 5
'Density-number': [], # 6
'Agglomeration': [], # 7
'Dispersion': [], # 8
'Parts': [], # 9
}
values_map = {}
for parsema in self.parsemae:
event_location = EventLocation(measure_number = parsema.measure_number, offset=parsema.offset)
partition = parsema.partition
partition_str = partition.as_string()
agglomeration = partition.get_agglomeration_index()
dispersion = partition.get_dispersion_index()
data['Index'].append(event_location.str_index)
data['Measure number'].append(parsema.measure_number)
data['Offset'].append(parsema.offset)
data['Global offset'].append(parsema.global_offset)
data['Duration'].append(parsema.duration)
# data['Offset'].append(fraction_to_string(parsema.offset))
# data['Global offset'].append(fraction_to_string(parsema.global_offset))
# data['Duration'].append(fraction_to_string(parsema.duration))
data['Partition'].append(partition_str)
data['Density-number'].append(partition.get_density_number())
data['Agglomeration'].append(agglomeration)
data['Dispersion'].append(dispersion)
data['Parts'].append(partition.parts)
if partition_str not in values_map.keys():
values_map[partition_str] = (agglomeration, dispersion)
return data, values_map
[docs]
def make_rpdata(self, filename: str) -> RPData:
'''Return `RPData` object from `parsemae` class attribute.'''
rpdata = RPData()
rpdata.path = file_rename(filename, 'json')
offset_map_orig = self._measure_offsets # values as fractions
offset_map_conv = {mn: offset for mn, offset in offset_map_orig.items()} # values as strings
# offset_map_conv = {mn: fraction_to_string(offset) for mn, offset in offset_map_orig.items()} # values as strings
rpdata.data, rpdata.values_map = self.get_data()
rpdata.partitions = rpdata.data['Partition']
rpdata.size = len(rpdata.partitions)
rpdata.offset_map = offset_map_conv
return rpdata
[docs]
def main(filename, csv, equally_sized):
sco = split_score(filename)
segment = ParsemaeSegment()
segment.make_from_music21_score(sco)
rpdata = segment.make_rpdata(filename)
rpdata.save_to_file()
if csv:
rpdata.save_to_csv(equally_sized)
[docs]
class Subparser(GeneralSubparser):
'''Implements argparser.'''
[docs]
def setup(self) -> None:
self.program_name = 'calc'
self.program_help = 'Calculator'
self.add_parent = False
[docs]
def add_arguments(self) -> None:
self.parser.add_argument('filename', help='digital score filename (XML, MXL, MIDI and KRN)', type=str)
self.parser.add_argument('-d', '--dir', help='folder with digital score files', default=False, action='store_true')
self.parser.add_argument('-m', '--multiprocessing', help='multiprocessing', default=False, action='store_true')
self.parser.add_argument('-c', '--csv', help='output data in a CSV file.', default=False, action='store_true')
self.parser.add_argument('-e', '--equally_sized', help='generate equally-sized events', default=False, action='store_true')
[docs]
def handle(self, args):
print('Running script on {} file...'.format(args.filename))
if args.dir:
filetypes = SCORE_FILETYPES[:]
filetypes.extend([f.upper() for f in SCORE_FILETYPES])
filetypes.extend([f.lower() for f in SCORE_FILETYPES])
filetypes = list(set(filetypes))
if not os.path.isdir(args.filename):
raise CustomException('The given filename is not of directory type.')
else:
files = [
f for f in map(lambda x: os.path.join(args.filename, x), sorted(os.listdir(args.filename)))
if os.path.isfile(f) and f.split('.')[-1] in filetypes
]
total_cpus = multiprocessing.cpu_count()
if args.multiprocessing and total_cpus > 3:
with multiprocessing.Pool(total_cpus - 2) as p:
out = p.starmap(main, [(f, args.csv, args.equally_sized) for f in files])
else:
for f in files:
main(f, args.csv, args.equally_sized)
else:
main(args.filename, args.csv, args.equally_sized)