Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion neo/io/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,7 @@
from neo.io.brainwaresrcio import BrainwareSrcIO
from neo.io.cedio import CedIO
from neo.io.elanio import ElanIO
# from neo.io.elphyio import ElphyIO
from neo.io.elphyio import ElphyIO
from neo.io.exampleio import ExampleIO
from neo.io.igorproio import IgorIO
from neo.io.intanio import IntanIO
Expand Down
57 changes: 38 additions & 19 deletions neo/io/elphyio.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,7 @@
from neo.io.baseio import BaseIO

# to import from core
from neo.core import (Block, Segment,
AnalogSignal, Event, SpikeTrain)
from neo.core import (Block, Segment, AnalogSignal, Event, SpikeTrain)


# --------------------------------------------------------
Expand Down Expand Up @@ -562,7 +561,9 @@ def get_protocol_and_version(self):

def get_title(self):
title_length, title = struct.unpack('<B20s', self.file.read(21))
return unicode(title[0:title_length])
if hasattr(title, 'decode'):
title = title.decode()
return str(title[0:title_length])

def get_user_file_info(self):
header = dict()
Expand Down Expand Up @@ -673,8 +674,10 @@ def get_protocol_and_version(self):
def get_title(self):
title_length = read_from_char(self.file, 'B')
title, = struct.unpack('<%ss' % title_length, self.file.read(title_length))
if hasattr(title, 'decode'):
title = title.decode()
self.file.seek(self.file.tell() + 255 - title_length)
return unicode(title)
return title

def get_user_file_info(self):
header = dict()
Expand Down Expand Up @@ -901,10 +904,14 @@ def __init__(self, layout):
assert not ((n_channels < 1) or (n_channels > 16)), "bad number of channels"
nbpt = read_from_char(fileobj, 'h')
l_xu, x_unit = struct.unpack('<B3s', fileobj.read(4))
if hasattr(x_unit, 'decode'):
x_unit = x_unit.decode()
# extract units for each channel
y_units = list()
for i in range(1, 7):
l_yu, y_unit = struct.unpack('<B3s', fileobj.read(4))
if hasattr(y_unit, 'decode'):
y_unit = y_unit.decode()
y_units.append(y_unit[0:l_yu])

# extract i1, i2, x1, x2 and compute dX and X0
Expand Down Expand Up @@ -1224,6 +1231,8 @@ def __init__(self, layout, identifier, start, size, fixed_length=None, size_form
Y0_ar = list()
for _ in range(0, 16):
l_yu, yu, dY, Y0 = struct.unpack('<B10sdd', layout.file.read(27))
if hasattr(yu, 'decode'):
yu = yu.decode()
y_units.append(yu[0:l_yu])
dY_ar.append(dY)
Y0_ar.append(Y0)
Expand Down Expand Up @@ -1530,6 +1539,8 @@ def __init__(self, layout, identifier, start, size, fixed_length=None, size_form
self.Y0_ar = list()
for _ in range(0, n_channels):
l_yu, y_unit, dY, Y0 = struct.unpack('<B10sdd', fileobj.read(27))
if hasattr(y_unit, 'decode'):
y_unit = y_unit.decode()
self.y_units.append(y_unit[0:l_yu])
self.dY_ar.append(dY)
self.Y0_ar.append(Y0)
Expand Down Expand Up @@ -1683,7 +1694,7 @@ def least_common_multiple(a, b):
"""
Return the value of the least common multiple.
"""
return (a * b) / gcd(a, b)
return int((a * b) / gcd(a, b))


# --------------------------------------------------------
Expand Down Expand Up @@ -1858,7 +1869,7 @@ def create_bit_mask(self, ep, ch):
for _ch in ch_mask:
size = self.sample_size(ep, _ch)
val = 1 if _ch == ch else 0
for _ in xrange(0, size):
for _ in np.arange(0, size):
_mask.append(val)
return np.array(_mask)

Expand Down Expand Up @@ -1899,7 +1910,7 @@ def reshape_bytes(self, databytes, reshape, datatypes, order='<'):
# create the mask for each shape
shape_mask = list()
for shape in reshape:
for _ in xrange(1, shape + 1):
for _ in np.arange(1, shape + 1):
shape_mask.append(shape)

# create a set of masks to extract data
Expand Down Expand Up @@ -1955,7 +1966,7 @@ def load_channel_data(self, ep, ch):
# reshape bytes from the sample size
dt = np.dtype(numpy_map[sample_symbol])
dt.newbyteorder('<')
return np.frombuffer(raw.reshape([len(raw) / sample_size, sample_size]), dt)
return np.frombuffer(raw.reshape([int(len(raw) / sample_size), sample_size]), dt)

def apply_op(self, np_array, value, op_type):
"""
Expand Down Expand Up @@ -2460,7 +2471,7 @@ def get_blocks_stored_in_episode(self, ep):
if (blk_1 == blk_2) or (i_2 < i_1):
return [k for k in data_blocks if self.blocks.index(k) > i_1]
else:
return [k for k in data_blocks if self.blocks.index(k) in xrange(i_1, i_2)]
return [k for k in data_blocks if self.blocks.index(k) in range(i_1, i_2)]

def set_cyberk_blocks(self):
ck_blocks = list()
Expand Down Expand Up @@ -2530,10 +2541,9 @@ def sub_sampling(self, ep, ch):
return block.ks_block.k_sampling[ch - 1] if block.ks_block else 1

def aggregate_size(self, block, ep):
ag_count = self.aggregate_sample_count(block)
ag_size = 0
for ch in range(1, ag_count + 1):
if (block.ks_block.k_sampling[ch - 1] != 0):
for ch in range(1, len(block.ks_block.k_sampling)):
if block.ks_block.k_sampling[ch - 1] != 0:
ag_size += self.sample_size(ep, ch)
return ag_size

Expand Down Expand Up @@ -2654,7 +2664,7 @@ def aggregate_sample_count(self, block):
count = 0
for i in range(0, block.ep_block.n_channels):
if block.ks_block.k_sampling[i] > 0:
count += lcm0 / block.ks_block.k_sampling[i]
count += int(lcm0 / block.ks_block.k_sampling[i])

return count

Expand Down Expand Up @@ -3017,6 +3027,8 @@ def create_sub_block(self, block, sub_offset):
self.file.seek(sub_offset)
sub_ident_size = read_from_char(self.file, 'B')
sub_identifier, = struct.unpack('<%ss' % sub_ident_size, self.file.read(sub_ident_size))
if hasattr(sub_identifier, 'decode'):
sub_identifier = sub_identifier.decode()
sub_data_size = read_from_char(self.file, 'H')
sub_data_offset = sub_offset + sub_ident_size + 3
size_format = "H"
Expand Down Expand Up @@ -3101,6 +3113,8 @@ def create_header(self, layout):
def create_block(self, layout, offset):
self.file.seek(offset)
ident_size, identifier = struct.unpack('<B15s', self.file.read(16))
if hasattr(identifier, 'decode'):
identifier = identifier.decode()
identifier = identifier[0:ident_size]
size = read_from_char(self.file, 'h')
block_type = self.select_block_subclass(identifier)
Expand Down Expand Up @@ -3139,6 +3153,8 @@ def create_header(self, layout):
def create_block(self, layout, offset):
self.file.seek(offset)
ident_size, identifier = struct.unpack('<B15s', self.file.read(16))
if hasattr(identifier, 'decode'):
identifier = identifier.decode()
# block title size is 7 or 15 bytes
# 7 is for sequence blocs
if identifier.startswith('DAC2SEQ'):
Expand Down Expand Up @@ -3186,6 +3202,8 @@ def create_block(self, layout, offset):
size = read_from_char(self.file, 'l')
ident_size = read_from_char(self.file, 'B')
identifier, = struct.unpack('<%ss' % ident_size, self.file.read(ident_size))
if hasattr(identifier, 'decode'):
identifier = identifier.decode()
block_type = self.select_block_subclass(identifier)
block = block_type(layout, identifier, offset, size, size_format='l')
self.file.seek(0)
Expand Down Expand Up @@ -3391,10 +3409,10 @@ def get_nomenclature(self):
"""
self.file.seek(0)
length, title = struct.unpack('<B15s', self.file.read(16))
self.file.seek(0)
title = title[0:length]
if hasattr(title, 'decode'):
title = title.decode()
self.file.seek(0)
title = title[0:length]
if title not in factories:
title = "format is not implemented ('{}' not in {})".format(
title, str(factories.keys()))
Expand Down Expand Up @@ -3810,7 +3828,7 @@ def read_block(self, lazy=False, ):

# create a segment containing all analog,
# tag and event channels for the episode
if self.elphy_file.n_episodes is None:
if self.elphy_file.n_episodes in [None, 0]:
print("File '%s' appears to have no episodes" % (self.filename))
return block
for episode in range(1, self.elphy_file.n_episodes + 1):
Expand Down Expand Up @@ -4205,13 +4223,14 @@ def read_segment(self, episode):
# each channel in the episode
for channel in range(1, self.elphy_file.n_channels(episode) + 1):
signal = self.elphy_file.get_signal(episode, channel)
x_unit = signal.x_unit.strip().decode()
analog_signal = AnalogSignal(
signal.data['y'],
units=signal.y_unit,
t_start=signal.t_start * getattr(pq, signal.x_unit.strip()),
t_stop=signal.t_stop * getattr(pq, signal.x_unit.strip()),
t_start=signal.t_start * getattr(pq, signal.x_unit.strip().decode()),
t_stop=signal.t_stop * getattr(pq, signal.x_unit.strip().decode()),
# sampling_rate = signal.sampling_frequency * pq.kHz,
sampling_period=signal.sampling_period * getattr(pq, signal.x_unit.strip()),
sampling_period=signal.sampling_period * getattr(pq, x_unit),
channel_name="episode {}, channel {}".format(int(episode + 1), int(channel + 1))
)
analog_signal.segment = segment
Expand Down
33 changes: 33 additions & 0 deletions neo/test/iotest/test_elphyio.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
"""
Tests of neo.io.elphyo
"""

import unittest

from neo.io import ElphyIO
from neo.test.iotest.common_io_test import BaseTestIO


class TestElphyIO(BaseTestIO, unittest.TestCase):
ioclass = ElphyIO
entities_to_download = [
'elphy'
]
entities_to_test = ['elphy/DATA1.DAT',
'elphy/ElphyExample.DAT',
'elphy/ElphyExample_Mode1.dat',
'elphy/ElphyExample_Mode2.dat',
'elphy/ElphyExample_Mode3.dat']

def test_read_data(self):
for filename in self.entities_to_test:
io = ElphyIO(self.get_local_path(filename))
bl = io.read_block()

self.assertTrue(len(bl.segments) > 0)
# ensure that at least one data object is generated for each file
self.assertTrue(any(list(bl.segments[0].size.values())))


if __name__ == "__main__":
unittest.main()