ihkapy.fileio.binary_io

  1# Copyright (C) 2004-2011 by Michaƫl Zugaro
  2#
  3# This program is free software; you can redistribute it and/or modify
  4# it under the terms of the GNU General Public License as published by
  5# the Free Software Foundation; either version 3 of the License, or
  6# (at your option) any later version.
  7#
  8# 22/04/2022 Modified and translated to Python (3.9.5) 
  9#            by Stephen Fay. Contact: dcxstephen@gmail.com 
 10# 03/20/2014 Modified by John D. Long to use only built-in Matlab 8.1
 11#            functions. Contact: jlong29@gmail.com
 12
 13import os                        # I/O
 14import numpy as np               # Scientific computing
 15import logging                   # Debug
 16from tqdm import tqdm            # Progress Bar
 17from contextlib import ExitStack # Context manager for opening many files at once
 18
 19# # Init logger and set the logging level
 20# logging.basicConfig(level=logging.DEBUG)
 21logging.basicConfig(level=logging.INFO)
 22logger = logging.getLogger(__name__)
 23# logger.setLevel(logging.DEBUG) # DEBUG < INFO < WARNING < ERROR < CRITICAL
 24
 25# Constant, used in _load_binary (=> it's parent load_binary too) and merge_dats
 26MAX_SAMPLES_PER_CHUNK = 10000 
 27
 28# Helper, used in other modules too (don't repeat yourself principle) 
 29def get_n_samples_from_dur_fs(dur,fs):
 30    """Utility, get the number of samples in a time window."""
 31    return int(dur * fs + 0.5) 
 32
 33def load_binary_multiple_segments(
 34        file_path       : str,
 35        n_chan          : int = 1,
 36        sample_rate     : int = None,
 37        offset_times    : list = [], 
 38        duration_time   : float or None = None,
 39        offset_sizes    : list = [],
 40        duration_size   : int or None = None,
 41        channels        : list = [],
 42        precision       : type = "int16"
 43        ) -> np.ndarray:
 44    """Load many segments of data from multiplexed binary file.
 45
 46    Either provide a list of offset times and a duration time in seconds
 47    XOR provide a list of offset sizes and a duration size for the segment
 48    in number of samples. 
 49
 50    Parameters
 51    ----------
 52    `file_path : str`
 53        Path to a .dat binary file
 54
 55    `n_chan : int`
 56        Number of data channels in the file (defaults to 1)
 57
 58    `sample_rate : int or float`
 59        Sample rate in Hz, (aka fs, frequency, sr is the MNE convention)
 60        Defaults to None, if none, must specify offset_size and duration_size
 61
 62    `offset_times : list or np.ndarray`
 63        Positions to start reading in seconds, (aka start_time), (defaults to empty)
 64
 65    `duration_time : float or None = None `
 66        Duration to read in seconds (per channel) (defaults to None)
 67
 68    `offset_sizes : list or np.ndarray`
 69        Positions to start reading in num of samples, defaults to empty.
 70
 71    `duration_size : int or None`
 72        Duration to read in number of samples (per channel) (defaults to None)
 73
 74    `channels : list `
 75        Indices of channels to read from, defaults to empty and uses all chs.
 76
 77    `precision : str`
 78        Sample precision, defaults to 'int16'.
 79
 80
 81    Returns
 82    -------
 83    `numpy.ndarray`
 84        A 3d array containg the segments' data, with shape 
 85        (n_segments , n_samples , n_binary_channels)
 86    """
 87    # If required, convert time to n_samples (aka sizes) 
 88    if list(offset_times): # falsy
 89        assert duration_time is not None, "Duration time must be specified"
 90        assert duration_time > 0 , "Duration time must be specified"
 91        assert not offset_sizes, "Cannot specify both times and sizes" 
 92        assert not duration_size, "Cannot specify both times and sizes"
 93        offset_sizes = [get_n_samples_from_dur_fs(dt,sample_rate) for dt in offset_times]
 94        duration_size = get_n_samples_from_dur_fs(duration_time,sample_rate)
 95    assert list(offset_sizes)
 96    assert duration_size > 0
 97    if not channels: channels = [i for i in range(n_chan)]
 98    # TODO: check whether they are integer values? Prob not necessary
 99    # TODO: check channels are valid ints in valid range
100
101    n_segments = len(offset_sizes) # the number of segments 
102    # Allocate space in memory
103    segments_data = np.zeros((n_segments, duration_size, len(channels)),dtype=precision) 
104    for idx,offset_size in enumerate(offset_sizes):
105        segments_data[idx,:,:] = load_binary(
106                file_path,
107                n_chan,
108                sample_rate,
109                offset_size=offset_size,
110                duration_size=duration_size,
111                channels=channels,
112                precision=precision)
113    return segments_data
114
115def load_binary(
116        file_path : str,
117        n_chan : int = 1,
118        sample_rate : int = None,
119        offset_time : float = None,
120        duration_time : float = None,
121        offset_size : int = None,
122        duration_size : int = None,
123        channels : list = [],
124        precision : type = "int16") -> np.ndarray:
125    """Load data from a multiplexed binary file.
126
127    Reading a subset of data can be done in two different manners: 
128    either by specifying start time ("offset_time") and duration ("duration_time") 
129    (more intuitive), or by indicating the position ("offset_size") and size of 
130    the subset in terms of number of samples per channel ("duration_size") 
131    (more accurate). The function will raise an error if both 'time' and 'size'
132    arguments are provided, this is to avoid ambiguity. 
133
134    Parameters
135    ----------
136    `file_path : str`
137        Path to a .dat binary file
138
139    `n_chan : int`
140        Number of data channels in the file (defaults to 1)
141
142    `sample_rate : int or float`
143        Sample rate in Hz, (aka fs, frequency, sr is the MNE convention) 
144        Defaults to None, if none, must specify offset_size and duration_size
145
146    `offset_time : int or float or None`
147        Position to start reading in seconds, (aka start_time) (defaults to None)
148
149    `duration_time : int or float or None`
150        Duration to read in seconds, (defaults to Inf)
151
152    `offset_size : int or None`
153        Position to start reading in samples (per channel) (defaults to None)
154
155    `duration_size : int or None`
156        Duration to read in number of samples (per channel) (defaults to None)
157
158    `channels : list or None`
159        Indices of channels to read from, defaults to None, if None uses all chs. 
160
161    `precision : str`
162        Sample precision, defaults to 'int16'.
163
164    Returns
165    -------
166    `numpy.ndarray`
167        A 2d array containg the specified segment's data. (1d if only one chan)
168    """
169    # Checks to make sure the intput is correct
170    assert n_chan == int(n_chan)
171    assert n_chan >= 1
172    logger.debug(f"{n_chan} channel(s) in this binary file") 
173    assert os.path.exists(file_path) , f"{file_path} appears not to exist."
174    if sample_rate is not None: assert sample_rate > 0 , f"Sample rate must be positive {sample_rate}"
175    if channels: 
176        assert len(channels) <= n_chan , "Too many channels passed"
177        assert len(set(channels)) == len(channels) , "Repeating channels"
178        for chan in channels: 
179            assert chan < n_chan and chan >= 0 , "Channel out of range"
180            assert int(chan) == chan , "Wrong type, must be int"
181    else: channels = [i for i in range(n_chan)]
182
183    # Either all four args are none -> read whole file xor:
184    #     offset_time,duration_time xor offset_size,duration_size
185    #     are both None (not just Falsy!)
186    if sample_rate == None: assert (offset_time,duration_time)==(None,)*2
187    if (offset_time,duration_time,offset_size,duration_size)==(None,)*4:
188        offset_size = 0
189        duration_size = np.inf
190    elif (offset_time,duration_time) == (None,)*2:
191        if offset_size == None: offset_size = 0
192        if duration_size == None: duration_size = np.inf
193    elif (offset_size,duration_size) == (None,)*2:
194        assert sample_rate
195        offset_size = 0
196        duration_size = np.inf
197        if offset_time: 
198            offset_size = get_n_samples_from_dur_fs(offset_time,sample_rate)
199        if duration_time: 
200            duration_size = get_n_samples_from_dur_fs(duration_time,sample_rate)
201    else:
202        raise Exception("Invalid Argument Combination!\nYou cannot specify both size-like and a time-like arguments for the duration and offset.")
203    assert offset_size >= 0 and int(offset_size) == offset_size , f"Bad offset {offset_size}"
204    assert duration_size > 0 , f"Non-positive duration size {duration_size}"
205
206        
207    # Figure out what the data offset is in bytes
208    bytes_per_sample = np.dtype(precision).itemsize
209    fsize_bytes = os.path.getsize(file_path)        # file size in num of bytes
210    fsize_samples = fsize_bytes // bytes_per_sample # file size in num of samples
211    assert fsize_bytes / bytes_per_sample == fsize_samples
212    fsize_samples_tail = fsize_samples - offset_size
213
214    # Make sure duration_size is compatible with file size and offset
215    if duration_size == np.inf:
216        logger.info("duration_size is np.inf")
217        duration_size = fsize_samples_tail // n_chan
218        assert fsize_samples_tail / n_chan == duration_size , f"Incompatability of parameters with shape of file. Either n_chan={nchan} is incorrect or your file {file_path} is corrupted."
219    else: 
220        assert duration_size * n_chan <= fsize_samples_tail , f"Duration size ={duration_size} and offset={offset_size} exceed the end of the file {file_name}"
221
222
223    data_offset = offset_size * n_chan * bytes_per_sample
224    n_samples = duration_size # number of samples per channel
225
226    return _load_binary(file_path,n_chan,n_samples,precision,data_offset)[:,channels]
227
228
229def _load_binary(
230        file_path : str,
231        n_chan : int,
232        n_samples : int,
233        precision : type,
234        data_offset : int = 0) -> np.ndarray: 
235    """Helper for load_binary; this is the method that contains the logic.
236
237    Parameters
238    ----------
239    `file_path : str`
240        Path to binary file with multiplexed data.
241
242    `n_chan : int`
243        The number of channels. 
244
245    `n_samples : int`
246        The number of units (samples/measurements) per channel 
247
248    `precision : type` 
249        The precision of the binary data: type or str rep of type.
250        e.g. numpy.int16 or "int16" are both valid
251
252    `data_offset : int`
253        Exact index of starting time.
254
255    Returns
256    -------
257    `np.ndarray`
258        the loaded segment of size (n_samples , n_chan)
259    """
260    total_n_samples = n_samples * n_chan 
261    with open(file_path , "rb") as file:
262        # Rem.  data_offset: uint = 
263        #           start_time * sample_rate * n_chan * bytes_per_sample
264        # Rem.  bytes_per_sample = np.dtype(precision).itemsize
265        file.seek(data_offset)
266        if total_n_samples <= MAX_SAMPLES_PER_CHUNK:
267            data = _load_chunk(file,n_chan,n_samples,precision)
268        else:
269            # Preallocate memory
270            data = np.zeros((n_samples , n_chan) , dtype=precision)
271            # Read all chunks
272            n_samples_per_chunk = MAX_SAMPLES_PER_CHUNK // n_chan * n_chan
273            n_chunks = n_samples // n_samples_per_chunk 
274            if not n_chunks: m=0 # extreme rare case, required define m for assertion
275            for j in range(n_chunks):
276                d =  _load_chunk(file,n_chan,n_samples_per_chunk,precision)
277                m,_ = d.shape
278                data[j*m:(j+1)*m , :] = d
279            # If data size not multiple of chunk size, read remainder
280            remainder = n_samples - n_chunks * n_samples_per_chunk
281            if remainder:
282                d = _load_chunk(file,n_chan,remainder,precision)
283                m_rem,_ = d.shape
284                assert m_rem # sanity check: logically m_rem cannot be zero
285                assert n_chunks*m == data.shape[0] - m_rem # sanity check
286                data[-m_rem: , :] = d
287    return data
288
289
290def merge_dats(
291        fpaths_in: list,
292        dir_out: str,
293        fname_out: str,
294        precision: str = "int16"
295        ):
296    """Merges all binary files fnames from the directory dir_in. 
297
298    Returns nothing (void). 
299
300    Parameters
301    ----------
302    fpaths_in : list
303        The ordered list of binary file paths (names) we are merging. 
304
305    dir_out : str
306        The directory we want to save the output to. 
307
308    fname_out : str
309        The name of the output file we are saving in dir_out
310        (including the extension, e.g. '.bin' or '.dat')
311
312    precision : str (optional, defaults to "int16")
313        The precision of the data stored in our binary files e.g. "int16"
314    """
315
316    assert os.path.exists(dir_out)
317    # Assert that all the binary files exist and have equal num of bytes
318    # Also, get the size of all the files, in bytes
319    size_in_bytes = _assert_all_files_same_size(fpaths_in)
320    fpath_out = os.path.join(dir_out,fname_out)
321    
322    # Define loading parameters
323    n_files = len(fpaths_in) # Equal to number of channels in the output file
324    n_samples_per_chunk = MAX_SAMPLES_PER_CHUNK // n_files * n_files
325    bytes_per_sample = np.dtype(precision).itemsize
326    assert size_in_bytes % bytes_per_sample == 0 # Sanity check
327    n_samples = size_in_bytes // bytes_per_sample # Number of samples in each file
328    # n_chunks = num of full chunks we need to load (there will be a remainder)
329    chunk_size = MAX_SAMPLES_PER_CHUNK
330    n_chunks = n_samples // chunk_size
331    remainder_chunksize = n_samples % chunk_size # In n of samples
332    
333    logger.info("Started merging files...")
334    with ExitStack() as stack, open(fpath_out,"wb") as f_out:
335        files = [stack.enter_context(open(fpath,"rb")) for fpath in fpaths_in]
336
337        d_buffer = np.zeros([chunk_size,n_files],dtype=precision) # data buffer, load into f_out
338        for _ in tqdm(range(n_chunks)): # tqdm is a progress bar
339            # Load a chunk from each of the files we are merging into memory
340            for idx,f in enumerate(files):
341                d_buffer[:,idx] = np.squeeze(_load_chunk(f,1,chunk_size,precision))
342            # Combine the chunks and write them to file
343            f_out.write(bytearray(d_buffer.flatten().tobytes())) # TODO: make sure this is saving things at same precision
344
345        # Add the left over chunk
346        if remainder_chunksize:
347            d_buffer = np.zeros([remainder_chunksize,n_files],dtype=precision)
348            for idx,f in enumerate(files):
349                d_buffer[:,idx] = np.squeeze(_load_chunk(f,1,remainder_chunksize,precision))
350                # Verify that we truely have reached the end of the file
351                assert not f.read(1), "Logic Error! Wrongly calculated file size."
352            # Combine the chunks and write them to file
353            f_out.write(bytearray(d_buffer.flatten().tobytes()))
354    logger.info("...Done merging files.")
355    return 
356
357
358# Helper, make sure all files contain the same number of bytes
359def _assert_all_files_same_size(filepaths:list):
360    if len(filepaths) == 0:
361        logger.debug("Zero files provided")
362        return None
363    os.path.exists(filepaths[0])
364    size = os.path.getsize(filepaths[0]) # they should all be same size
365    for fpath in filepaths[1:]:
366        assert os.path.exists(fpath) # Make sure fname exists
367        logger.debug(f"size: {size}")
368        logger.debug(f"getsize fpath: {os.path.getsize(fpath)}")
369        logger.debug(f"fpath = '{fpath}'")
370        assert size == os.path.getsize(fpath)
371    logger.debug(f"All {len(filepaths)} files passed are of size={size} bytes")
372    return size
373
374
375def _load_chunk(
376        file,
377        n_chan : int,
378        n_samples : int,
379        precision : type) -> np.ndarray:
380    """Helper. Loads a chunk of size (n_chan,n_samples) from buffered reader f.
381    
382    This is really just a MatLab-esque wrapper for numpy.fromfile().
383
384    Parameters
385    ----------
386    `file : an io file buffered reader object`
387        The binary file that you are reading. The python built-in type
388        that you get from open(file_path , "rb")
389
390    `n_chan : int`
391        The number of channels. 
392
393    `n_samples : int`
394        The number of units (measurements) in the sample 
395
396    `precision : type (or a str representation of a valid type)`
397        The precision of the binary data, 
398        e.g. numpy.int16 or "int16" are both valid
399    
400    Returns
401    -------
402    `numpy.ndarray`
403        2D array of shape (n_chan , n_samples)
404        If the binary file contains an ascending sequence [0,1,2,3,...]
405        then calling _load_chunk with n_chan = 2 and n_samples = 3 will
406        result in the following array [[0, 1], [2, 3], [4, 5]]
407    """
408    d = np.fromfile(
409        file,
410        dtype = precision,
411        count = n_chan * n_samples).reshape((n_samples,n_chan))
412    assert (n_samples,n_chan) == d.shape , f"Incompatible size (({n_samples},{n_chan}) == {d.shape})"
413    return d
414
415
416
417if __name__=="__main__":
418    # TESTS
419    import array
420    
421    ### Test _load_chunk
422    logger.debug("Testing _load_chunk()...")
423    # Write a binary file
424    arrin = array.array("h" , np.arange(50))
425    with open("temp_test.dat","wb") as file:
426        arrin.tofile(file)
427    # Read the binary file we just created with _load_chunk
428    with open("temp_test.dat","rb") as file:
429        arrout = _load_chunk(file, n_chan=2, n_samples=5, precision="int16")
430    # Assert that result must be equal to matlab version
431    assert (arrout == np.array([[0,1],[2,3],[4,5],[6,7],[8,9]])).all()
432    # Remove temp binary file
433    os.remove("temp_test.dat")
434    logger.debug("_load_chunk() passed all tests.")
435
436
437    ### Test load_binary
438    logger.debug("Testing load_binary()...")
439    # Write a binary file
440    arrin = array.array("h" , np.arange(50))
441    with open("temp_test.dat","wb") as file:
442        arrin.tofile(file)
443    def load(fname="temp_test.dat" , **kwargs): # helper
444        with open(fname,"rb") as file:
445            arrout = load_binary(fname,**kwargs)
446        return arrout
447    arrout = load(n_chan=2)
448    assert (arrout == np.arange(50).reshape(25,2)).all()
449    arrout = load(n_chan=2,offset_size=2,duration_size=2)
450    assert (arrout == np.arange(4,8).reshape(2,2)).all()
451    arrout = load(n_chan=5,sample_rate=1,offset_time=5,duration_time=3)
452    assert (arrout == np.arange(25,40).reshape(3,5)).all()
453    # Remove temp binary file
454    os.remove("temp_test.dat")
455    logger.debug("load_binary() passed all tests.")
456
457
458    ### Test load_binary_multiple_segments
459    logger.debug("Testing load_binary_multiple_segments()...")
460    # Write a binary file
461    arrin = array.array("h", np.arange(500))
462    with open("temp_test.dat","wb") as file:
463        arrin.tofile(file)
464    def load_segs(fname="temp_test.dat" , **kwargs): # helper
465        with open(fname,"rb") as file:
466            arrout = load_binary_multiple_segments(fname, **kwargs)
467        return arrout
468    arrout = load_segs(n_chan=2,sample_rate=2,offset_times=[10,20,30],duration_time=5,channels=[0,1],precision="int16")
469    assert (arrout.shape == np.array([3,10,2])).all()
470    assert (arrout[0,:,:] == np.arange(40,60).reshape(10,2)).all()
471    assert (arrout[2,:,:] == np.arange(120,140).reshape(10,2)).all()
472    # Same array load, therefore same tests, but with different args
473    arrout = load_segs(n_chan=2,sample_rate=2,offset_sizes=[20,40,60],duration_size=10,precision="int16") 
474    assert (arrout.shape == np.array([3,10,2])).all()
475    assert (arrout[0,:,:] == np.arange(40,60).reshape(10,2)).all()
476    assert (arrout[2,:,:] == np.arange(120,140).reshape(10,2)).all()
477    logger.debug("load_binary_multiple_segments() all tests Passed")
478    
479    
480    ### Test merge_dats() 
481    # by nature, merge_dats() contains some very dense code 
482    logger.debug("Testing merge_dats()...")
483    # TEST 1
484    # Create some binary files
485    arr1 = np.asarray([0,2,4,6,8,10,12,14])
486    arr2 = np.asarray([1,3,5,7,9,11,13,15])
487    arr1.astype('int16').tofile("./arr1.dat")
488    arr2.astype('int16').tofile("./arr2.dat")
489    # Merge them
490    merge_dats(
491            fpaths_in=["./arr1.dat","./arr2.dat"],
492            dir_out="./",
493            fname_out="arrs1and2.dat",
494            precision="int16")
495    # Examine outfile
496    merged = np.fromfile("./arrs1and2.dat",dtype="int16")
497    expected_merged = np.arange(16,dtype="int16") 
498    print(f"Merged dats: {merged}")
499    print(f"Expected: {expected_merged}")
500    for i,j in zip(merged,expected_merged): assert i==j
501    # TEST 2
502    # Create some binary files
503    arr1 = np.arange(0,2*MAX_SAMPLES_PER_CHUNK,2) % 32768 # max val int16
504    arr2 = np.arange(1,2*MAX_SAMPLES_PER_CHUNK,2) % 32768 # max val int16
505    arr1.astype('int16').tofile("./arr1.dat")
506    arr2.astype('int16').tofile("./arr2.dat")
507    # Merge them
508    merge_dats(
509            fpaths_in=["./arr1.dat","./arr2.dat"],
510            dir_out="./",
511            fname_out="arrs1and2.dat",
512            precision="int16")
513    merged = np.fromfile("./arrs1and2.dat",dtype="int16")
514    expected_merged = np.arange(0,2*MAX_SAMPLES_PER_CHUNK) % 32768
515    # Examine outfile
516    for i,j in zip(merged,expected_merged): assert i==j
517    # Clean up
518    os.remove("./arr1.dat")
519    os.remove("./arr2.dat")
520    os.remove("./arrs1and2.dat")
521
522
523    # TODO: Write test for _load_binary()
524
525    
def get_n_samples_from_dur_fs(dur, fs)
30def get_n_samples_from_dur_fs(dur,fs):
31    """Utility, get the number of samples in a time window."""
32    return int(dur * fs + 0.5) 

Utility, get the number of samples in a time window.

def load_binary_multiple_segments( file_path: str, n_chan: int = 1, sample_rate: int = None, offset_times: list = [], duration_time: float = None, offset_sizes: list = [], duration_size: int = None, channels: list = [], precision: type = 'int16') -> numpy.ndarray:
 34def load_binary_multiple_segments(
 35        file_path       : str,
 36        n_chan          : int = 1,
 37        sample_rate     : int = None,
 38        offset_times    : list = [], 
 39        duration_time   : float or None = None,
 40        offset_sizes    : list = [],
 41        duration_size   : int or None = None,
 42        channels        : list = [],
 43        precision       : type = "int16"
 44        ) -> np.ndarray:
 45    """Load many segments of data from multiplexed binary file.
 46
 47    Either provide a list of offset times and a duration time in seconds
 48    XOR provide a list of offset sizes and a duration size for the segment
 49    in number of samples. 
 50
 51    Parameters
 52    ----------
 53    `file_path : str`
 54        Path to a .dat binary file
 55
 56    `n_chan : int`
 57        Number of data channels in the file (defaults to 1)
 58
 59    `sample_rate : int or float`
 60        Sample rate in Hz, (aka fs, frequency, sr is the MNE convention)
 61        Defaults to None, if none, must specify offset_size and duration_size
 62
 63    `offset_times : list or np.ndarray`
 64        Positions to start reading in seconds, (aka start_time), (defaults to empty)
 65
 66    `duration_time : float or None = None `
 67        Duration to read in seconds (per channel) (defaults to None)
 68
 69    `offset_sizes : list or np.ndarray`
 70        Positions to start reading in num of samples, defaults to empty.
 71
 72    `duration_size : int or None`
 73        Duration to read in number of samples (per channel) (defaults to None)
 74
 75    `channels : list `
 76        Indices of channels to read from, defaults to empty and uses all chs.
 77
 78    `precision : str`
 79        Sample precision, defaults to 'int16'.
 80
 81
 82    Returns
 83    -------
 84    `numpy.ndarray`
 85        A 3d array containg the segments' data, with shape 
 86        (n_segments , n_samples , n_binary_channels)
 87    """
 88    # If required, convert time to n_samples (aka sizes) 
 89    if list(offset_times): # falsy
 90        assert duration_time is not None, "Duration time must be specified"
 91        assert duration_time > 0 , "Duration time must be specified"
 92        assert not offset_sizes, "Cannot specify both times and sizes" 
 93        assert not duration_size, "Cannot specify both times and sizes"
 94        offset_sizes = [get_n_samples_from_dur_fs(dt,sample_rate) for dt in offset_times]
 95        duration_size = get_n_samples_from_dur_fs(duration_time,sample_rate)
 96    assert list(offset_sizes)
 97    assert duration_size > 0
 98    if not channels: channels = [i for i in range(n_chan)]
 99    # TODO: check whether they are integer values? Prob not necessary
100    # TODO: check channels are valid ints in valid range
101
102    n_segments = len(offset_sizes) # the number of segments 
103    # Allocate space in memory
104    segments_data = np.zeros((n_segments, duration_size, len(channels)),dtype=precision) 
105    for idx,offset_size in enumerate(offset_sizes):
106        segments_data[idx,:,:] = load_binary(
107                file_path,
108                n_chan,
109                sample_rate,
110                offset_size=offset_size,
111                duration_size=duration_size,
112                channels=channels,
113                precision=precision)
114    return segments_data

Load many segments of data from multiplexed binary file.

Either provide a list of offset times and a duration time in seconds XOR provide a list of offset sizes and a duration size for the segment in number of samples.

Parameters

file_path : str Path to a .dat binary file

n_chan : int Number of data channels in the file (defaults to 1)

sample_rate : int or float Sample rate in Hz, (aka fs, frequency, sr is the MNE convention) Defaults to None, if none, must specify offset_size and duration_size

offset_times : list or np.ndarray Positions to start reading in seconds, (aka start_time), (defaults to empty)

duration_time : float or None = None Duration to read in seconds (per channel) (defaults to None)

offset_sizes : list or np.ndarray Positions to start reading in num of samples, defaults to empty.

duration_size : int or None Duration to read in number of samples (per channel) (defaults to None)

channels : list Indices of channels to read from, defaults to empty and uses all chs.

precision : str Sample precision, defaults to 'int16'.

Returns

numpy.ndarray A 3d array containg the segments' data, with shape (n_segments , n_samples , n_binary_channels)

def load_binary( file_path: str, n_chan: int = 1, sample_rate: int = None, offset_time: float = None, duration_time: float = None, offset_size: int = None, duration_size: int = None, channels: list = [], precision: type = 'int16') -> numpy.ndarray:
116def load_binary(
117        file_path : str,
118        n_chan : int = 1,
119        sample_rate : int = None,
120        offset_time : float = None,
121        duration_time : float = None,
122        offset_size : int = None,
123        duration_size : int = None,
124        channels : list = [],
125        precision : type = "int16") -> np.ndarray:
126    """Load data from a multiplexed binary file.
127
128    Reading a subset of data can be done in two different manners: 
129    either by specifying start time ("offset_time") and duration ("duration_time") 
130    (more intuitive), or by indicating the position ("offset_size") and size of 
131    the subset in terms of number of samples per channel ("duration_size") 
132    (more accurate). The function will raise an error if both 'time' and 'size'
133    arguments are provided, this is to avoid ambiguity. 
134
135    Parameters
136    ----------
137    `file_path : str`
138        Path to a .dat binary file
139
140    `n_chan : int`
141        Number of data channels in the file (defaults to 1)
142
143    `sample_rate : int or float`
144        Sample rate in Hz, (aka fs, frequency, sr is the MNE convention) 
145        Defaults to None, if none, must specify offset_size and duration_size
146
147    `offset_time : int or float or None`
148        Position to start reading in seconds, (aka start_time) (defaults to None)
149
150    `duration_time : int or float or None`
151        Duration to read in seconds, (defaults to Inf)
152
153    `offset_size : int or None`
154        Position to start reading in samples (per channel) (defaults to None)
155
156    `duration_size : int or None`
157        Duration to read in number of samples (per channel) (defaults to None)
158
159    `channels : list or None`
160        Indices of channels to read from, defaults to None, if None uses all chs. 
161
162    `precision : str`
163        Sample precision, defaults to 'int16'.
164
165    Returns
166    -------
167    `numpy.ndarray`
168        A 2d array containg the specified segment's data. (1d if only one chan)
169    """
170    # Checks to make sure the intput is correct
171    assert n_chan == int(n_chan)
172    assert n_chan >= 1
173    logger.debug(f"{n_chan} channel(s) in this binary file") 
174    assert os.path.exists(file_path) , f"{file_path} appears not to exist."
175    if sample_rate is not None: assert sample_rate > 0 , f"Sample rate must be positive {sample_rate}"
176    if channels: 
177        assert len(channels) <= n_chan , "Too many channels passed"
178        assert len(set(channels)) == len(channels) , "Repeating channels"
179        for chan in channels: 
180            assert chan < n_chan and chan >= 0 , "Channel out of range"
181            assert int(chan) == chan , "Wrong type, must be int"
182    else: channels = [i for i in range(n_chan)]
183
184    # Either all four args are none -> read whole file xor:
185    #     offset_time,duration_time xor offset_size,duration_size
186    #     are both None (not just Falsy!)
187    if sample_rate == None: assert (offset_time,duration_time)==(None,)*2
188    if (offset_time,duration_time,offset_size,duration_size)==(None,)*4:
189        offset_size = 0
190        duration_size = np.inf
191    elif (offset_time,duration_time) == (None,)*2:
192        if offset_size == None: offset_size = 0
193        if duration_size == None: duration_size = np.inf
194    elif (offset_size,duration_size) == (None,)*2:
195        assert sample_rate
196        offset_size = 0
197        duration_size = np.inf
198        if offset_time: 
199            offset_size = get_n_samples_from_dur_fs(offset_time,sample_rate)
200        if duration_time: 
201            duration_size = get_n_samples_from_dur_fs(duration_time,sample_rate)
202    else:
203        raise Exception("Invalid Argument Combination!\nYou cannot specify both size-like and a time-like arguments for the duration and offset.")
204    assert offset_size >= 0 and int(offset_size) == offset_size , f"Bad offset {offset_size}"
205    assert duration_size > 0 , f"Non-positive duration size {duration_size}"
206
207        
208    # Figure out what the data offset is in bytes
209    bytes_per_sample = np.dtype(precision).itemsize
210    fsize_bytes = os.path.getsize(file_path)        # file size in num of bytes
211    fsize_samples = fsize_bytes // bytes_per_sample # file size in num of samples
212    assert fsize_bytes / bytes_per_sample == fsize_samples
213    fsize_samples_tail = fsize_samples - offset_size
214
215    # Make sure duration_size is compatible with file size and offset
216    if duration_size == np.inf:
217        logger.info("duration_size is np.inf")
218        duration_size = fsize_samples_tail // n_chan
219        assert fsize_samples_tail / n_chan == duration_size , f"Incompatability of parameters with shape of file. Either n_chan={nchan} is incorrect or your file {file_path} is corrupted."
220    else: 
221        assert duration_size * n_chan <= fsize_samples_tail , f"Duration size ={duration_size} and offset={offset_size} exceed the end of the file {file_name}"
222
223
224    data_offset = offset_size * n_chan * bytes_per_sample
225    n_samples = duration_size # number of samples per channel
226
227    return _load_binary(file_path,n_chan,n_samples,precision,data_offset)[:,channels]

Load data from a multiplexed binary file.

Reading a subset of data can be done in two different manners: either by specifying start time ("offset_time") and duration ("duration_time") (more intuitive), or by indicating the position ("offset_size") and size of the subset in terms of number of samples per channel ("duration_size") (more accurate). The function will raise an error if both 'time' and 'size' arguments are provided, this is to avoid ambiguity.

Parameters

file_path : str Path to a .dat binary file

n_chan : int Number of data channels in the file (defaults to 1)

sample_rate : int or float Sample rate in Hz, (aka fs, frequency, sr is the MNE convention) Defaults to None, if none, must specify offset_size and duration_size

offset_time : int or float or None Position to start reading in seconds, (aka start_time) (defaults to None)

duration_time : int or float or None Duration to read in seconds, (defaults to Inf)

offset_size : int or None Position to start reading in samples (per channel) (defaults to None)

duration_size : int or None Duration to read in number of samples (per channel) (defaults to None)

channels : list or None Indices of channels to read from, defaults to None, if None uses all chs.

precision : str Sample precision, defaults to 'int16'.

Returns

numpy.ndarray A 2d array containg the specified segment's data. (1d if only one chan)

def merge_dats( fpaths_in: list, dir_out: str, fname_out: str, precision: str = 'int16')
291def merge_dats(
292        fpaths_in: list,
293        dir_out: str,
294        fname_out: str,
295        precision: str = "int16"
296        ):
297    """Merges all binary files fnames from the directory dir_in. 
298
299    Returns nothing (void). 
300
301    Parameters
302    ----------
303    fpaths_in : list
304        The ordered list of binary file paths (names) we are merging. 
305
306    dir_out : str
307        The directory we want to save the output to. 
308
309    fname_out : str
310        The name of the output file we are saving in dir_out
311        (including the extension, e.g. '.bin' or '.dat')
312
313    precision : str (optional, defaults to "int16")
314        The precision of the data stored in our binary files e.g. "int16"
315    """
316
317    assert os.path.exists(dir_out)
318    # Assert that all the binary files exist and have equal num of bytes
319    # Also, get the size of all the files, in bytes
320    size_in_bytes = _assert_all_files_same_size(fpaths_in)
321    fpath_out = os.path.join(dir_out,fname_out)
322    
323    # Define loading parameters
324    n_files = len(fpaths_in) # Equal to number of channels in the output file
325    n_samples_per_chunk = MAX_SAMPLES_PER_CHUNK // n_files * n_files
326    bytes_per_sample = np.dtype(precision).itemsize
327    assert size_in_bytes % bytes_per_sample == 0 # Sanity check
328    n_samples = size_in_bytes // bytes_per_sample # Number of samples in each file
329    # n_chunks = num of full chunks we need to load (there will be a remainder)
330    chunk_size = MAX_SAMPLES_PER_CHUNK
331    n_chunks = n_samples // chunk_size
332    remainder_chunksize = n_samples % chunk_size # In n of samples
333    
334    logger.info("Started merging files...")
335    with ExitStack() as stack, open(fpath_out,"wb") as f_out:
336        files = [stack.enter_context(open(fpath,"rb")) for fpath in fpaths_in]
337
338        d_buffer = np.zeros([chunk_size,n_files],dtype=precision) # data buffer, load into f_out
339        for _ in tqdm(range(n_chunks)): # tqdm is a progress bar
340            # Load a chunk from each of the files we are merging into memory
341            for idx,f in enumerate(files):
342                d_buffer[:,idx] = np.squeeze(_load_chunk(f,1,chunk_size,precision))
343            # Combine the chunks and write them to file
344            f_out.write(bytearray(d_buffer.flatten().tobytes())) # TODO: make sure this is saving things at same precision
345
346        # Add the left over chunk
347        if remainder_chunksize:
348            d_buffer = np.zeros([remainder_chunksize,n_files],dtype=precision)
349            for idx,f in enumerate(files):
350                d_buffer[:,idx] = np.squeeze(_load_chunk(f,1,remainder_chunksize,precision))
351                # Verify that we truely have reached the end of the file
352                assert not f.read(1), "Logic Error! Wrongly calculated file size."
353            # Combine the chunks and write them to file
354            f_out.write(bytearray(d_buffer.flatten().tobytes()))
355    logger.info("...Done merging files.")
356    return 

Merges all binary files fnames from the directory dir_in.

Returns nothing (void).

Parameters

fpaths_in : list The ordered list of binary file paths (names) we are merging.

dir_out : str The directory we want to save the output to.

fname_out : str The name of the output file we are saving in dir_out (including the extension, e.g. '.bin' or '.dat')

precision : str (optional, defaults to "int16") The precision of the data stored in our binary files e.g. "int16"