ihkapy.fileio.binary_io
1# Copyright (C) 2004-2011 by Michaƫl Zugaro 2# 3# This program is free software; you can redistribute it and/or modify 4# it under the terms of the GNU General Public License as published by 5# the Free Software Foundation; either version 3 of the License, or 6# (at your option) any later version. 7# 8# 22/04/2022 Modified and translated to Python (3.9.5) 9# by Stephen Fay. Contact: dcxstephen@gmail.com 10# 03/20/2014 Modified by John D. Long to use only built-in Matlab 8.1 11# functions. Contact: jlong29@gmail.com 12 13import os # I/O 14import numpy as np # Scientific computing 15import logging # Debug 16from tqdm import tqdm # Progress Bar 17from contextlib import ExitStack # Context manager for opening many files at once 18 19# # Init logger and set the logging level 20# logging.basicConfig(level=logging.DEBUG) 21logging.basicConfig(level=logging.INFO) 22logger = logging.getLogger(__name__) 23# logger.setLevel(logging.DEBUG) # DEBUG < INFO < WARNING < ERROR < CRITICAL 24 25# Constant, used in _load_binary (=> it's parent load_binary too) and merge_dats 26MAX_SAMPLES_PER_CHUNK = 10000 27 28# Helper, used in other modules too (don't repeat yourself principle) 29def get_n_samples_from_dur_fs(dur,fs): 30 """Utility, get the number of samples in a time window.""" 31 return int(dur * fs + 0.5) 32 33def load_binary_multiple_segments( 34 file_path : str, 35 n_chan : int = 1, 36 sample_rate : int = None, 37 offset_times : list = [], 38 duration_time : float or None = None, 39 offset_sizes : list = [], 40 duration_size : int or None = None, 41 channels : list = [], 42 precision : type = "int16" 43 ) -> np.ndarray: 44 """Load many segments of data from multiplexed binary file. 45 46 Either provide a list of offset times and a duration time in seconds 47 XOR provide a list of offset sizes and a duration size for the segment 48 in number of samples. 49 50 Parameters 51 ---------- 52 `file_path : str` 53 Path to a .dat binary file 54 55 `n_chan : int` 56 Number of data channels in the file (defaults to 1) 57 58 `sample_rate : int or float` 59 Sample rate in Hz, (aka fs, frequency, sr is the MNE convention) 60 Defaults to None, if none, must specify offset_size and duration_size 61 62 `offset_times : list or np.ndarray` 63 Positions to start reading in seconds, (aka start_time), (defaults to empty) 64 65 `duration_time : float or None = None ` 66 Duration to read in seconds (per channel) (defaults to None) 67 68 `offset_sizes : list or np.ndarray` 69 Positions to start reading in num of samples, defaults to empty. 70 71 `duration_size : int or None` 72 Duration to read in number of samples (per channel) (defaults to None) 73 74 `channels : list ` 75 Indices of channels to read from, defaults to empty and uses all chs. 76 77 `precision : str` 78 Sample precision, defaults to 'int16'. 79 80 81 Returns 82 ------- 83 `numpy.ndarray` 84 A 3d array containg the segments' data, with shape 85 (n_segments , n_samples , n_binary_channels) 86 """ 87 # If required, convert time to n_samples (aka sizes) 88 if list(offset_times): # falsy 89 assert duration_time is not None, "Duration time must be specified" 90 assert duration_time > 0 , "Duration time must be specified" 91 assert not offset_sizes, "Cannot specify both times and sizes" 92 assert not duration_size, "Cannot specify both times and sizes" 93 offset_sizes = [get_n_samples_from_dur_fs(dt,sample_rate) for dt in offset_times] 94 duration_size = get_n_samples_from_dur_fs(duration_time,sample_rate) 95 assert list(offset_sizes) 96 assert duration_size > 0 97 if not channels: channels = [i for i in range(n_chan)] 98 # TODO: check whether they are integer values? Prob not necessary 99 # TODO: check channels are valid ints in valid range 100 101 n_segments = len(offset_sizes) # the number of segments 102 # Allocate space in memory 103 segments_data = np.zeros((n_segments, duration_size, len(channels)),dtype=precision) 104 for idx,offset_size in enumerate(offset_sizes): 105 segments_data[idx,:,:] = load_binary( 106 file_path, 107 n_chan, 108 sample_rate, 109 offset_size=offset_size, 110 duration_size=duration_size, 111 channels=channels, 112 precision=precision) 113 return segments_data 114 115def load_binary( 116 file_path : str, 117 n_chan : int = 1, 118 sample_rate : int = None, 119 offset_time : float = None, 120 duration_time : float = None, 121 offset_size : int = None, 122 duration_size : int = None, 123 channels : list = [], 124 precision : type = "int16") -> np.ndarray: 125 """Load data from a multiplexed binary file. 126 127 Reading a subset of data can be done in two different manners: 128 either by specifying start time ("offset_time") and duration ("duration_time") 129 (more intuitive), or by indicating the position ("offset_size") and size of 130 the subset in terms of number of samples per channel ("duration_size") 131 (more accurate). The function will raise an error if both 'time' and 'size' 132 arguments are provided, this is to avoid ambiguity. 133 134 Parameters 135 ---------- 136 `file_path : str` 137 Path to a .dat binary file 138 139 `n_chan : int` 140 Number of data channels in the file (defaults to 1) 141 142 `sample_rate : int or float` 143 Sample rate in Hz, (aka fs, frequency, sr is the MNE convention) 144 Defaults to None, if none, must specify offset_size and duration_size 145 146 `offset_time : int or float or None` 147 Position to start reading in seconds, (aka start_time) (defaults to None) 148 149 `duration_time : int or float or None` 150 Duration to read in seconds, (defaults to Inf) 151 152 `offset_size : int or None` 153 Position to start reading in samples (per channel) (defaults to None) 154 155 `duration_size : int or None` 156 Duration to read in number of samples (per channel) (defaults to None) 157 158 `channels : list or None` 159 Indices of channels to read from, defaults to None, if None uses all chs. 160 161 `precision : str` 162 Sample precision, defaults to 'int16'. 163 164 Returns 165 ------- 166 `numpy.ndarray` 167 A 2d array containg the specified segment's data. (1d if only one chan) 168 """ 169 # Checks to make sure the intput is correct 170 assert n_chan == int(n_chan) 171 assert n_chan >= 1 172 logger.debug(f"{n_chan} channel(s) in this binary file") 173 assert os.path.exists(file_path) , f"{file_path} appears not to exist." 174 if sample_rate is not None: assert sample_rate > 0 , f"Sample rate must be positive {sample_rate}" 175 if channels: 176 assert len(channels) <= n_chan , "Too many channels passed" 177 assert len(set(channels)) == len(channels) , "Repeating channels" 178 for chan in channels: 179 assert chan < n_chan and chan >= 0 , "Channel out of range" 180 assert int(chan) == chan , "Wrong type, must be int" 181 else: channels = [i for i in range(n_chan)] 182 183 # Either all four args are none -> read whole file xor: 184 # offset_time,duration_time xor offset_size,duration_size 185 # are both None (not just Falsy!) 186 if sample_rate == None: assert (offset_time,duration_time)==(None,)*2 187 if (offset_time,duration_time,offset_size,duration_size)==(None,)*4: 188 offset_size = 0 189 duration_size = np.inf 190 elif (offset_time,duration_time) == (None,)*2: 191 if offset_size == None: offset_size = 0 192 if duration_size == None: duration_size = np.inf 193 elif (offset_size,duration_size) == (None,)*2: 194 assert sample_rate 195 offset_size = 0 196 duration_size = np.inf 197 if offset_time: 198 offset_size = get_n_samples_from_dur_fs(offset_time,sample_rate) 199 if duration_time: 200 duration_size = get_n_samples_from_dur_fs(duration_time,sample_rate) 201 else: 202 raise Exception("Invalid Argument Combination!\nYou cannot specify both size-like and a time-like arguments for the duration and offset.") 203 assert offset_size >= 0 and int(offset_size) == offset_size , f"Bad offset {offset_size}" 204 assert duration_size > 0 , f"Non-positive duration size {duration_size}" 205 206 207 # Figure out what the data offset is in bytes 208 bytes_per_sample = np.dtype(precision).itemsize 209 fsize_bytes = os.path.getsize(file_path) # file size in num of bytes 210 fsize_samples = fsize_bytes // bytes_per_sample # file size in num of samples 211 assert fsize_bytes / bytes_per_sample == fsize_samples 212 fsize_samples_tail = fsize_samples - offset_size 213 214 # Make sure duration_size is compatible with file size and offset 215 if duration_size == np.inf: 216 logger.info("duration_size is np.inf") 217 duration_size = fsize_samples_tail // n_chan 218 assert fsize_samples_tail / n_chan == duration_size , f"Incompatability of parameters with shape of file. Either n_chan={nchan} is incorrect or your file {file_path} is corrupted." 219 else: 220 assert duration_size * n_chan <= fsize_samples_tail , f"Duration size ={duration_size} and offset={offset_size} exceed the end of the file {file_name}" 221 222 223 data_offset = offset_size * n_chan * bytes_per_sample 224 n_samples = duration_size # number of samples per channel 225 226 return _load_binary(file_path,n_chan,n_samples,precision,data_offset)[:,channels] 227 228 229def _load_binary( 230 file_path : str, 231 n_chan : int, 232 n_samples : int, 233 precision : type, 234 data_offset : int = 0) -> np.ndarray: 235 """Helper for load_binary; this is the method that contains the logic. 236 237 Parameters 238 ---------- 239 `file_path : str` 240 Path to binary file with multiplexed data. 241 242 `n_chan : int` 243 The number of channels. 244 245 `n_samples : int` 246 The number of units (samples/measurements) per channel 247 248 `precision : type` 249 The precision of the binary data: type or str rep of type. 250 e.g. numpy.int16 or "int16" are both valid 251 252 `data_offset : int` 253 Exact index of starting time. 254 255 Returns 256 ------- 257 `np.ndarray` 258 the loaded segment of size (n_samples , n_chan) 259 """ 260 total_n_samples = n_samples * n_chan 261 with open(file_path , "rb") as file: 262 # Rem. data_offset: uint = 263 # start_time * sample_rate * n_chan * bytes_per_sample 264 # Rem. bytes_per_sample = np.dtype(precision).itemsize 265 file.seek(data_offset) 266 if total_n_samples <= MAX_SAMPLES_PER_CHUNK: 267 data = _load_chunk(file,n_chan,n_samples,precision) 268 else: 269 # Preallocate memory 270 data = np.zeros((n_samples , n_chan) , dtype=precision) 271 # Read all chunks 272 n_samples_per_chunk = MAX_SAMPLES_PER_CHUNK // n_chan * n_chan 273 n_chunks = n_samples // n_samples_per_chunk 274 if not n_chunks: m=0 # extreme rare case, required define m for assertion 275 for j in range(n_chunks): 276 d = _load_chunk(file,n_chan,n_samples_per_chunk,precision) 277 m,_ = d.shape 278 data[j*m:(j+1)*m , :] = d 279 # If data size not multiple of chunk size, read remainder 280 remainder = n_samples - n_chunks * n_samples_per_chunk 281 if remainder: 282 d = _load_chunk(file,n_chan,remainder,precision) 283 m_rem,_ = d.shape 284 assert m_rem # sanity check: logically m_rem cannot be zero 285 assert n_chunks*m == data.shape[0] - m_rem # sanity check 286 data[-m_rem: , :] = d 287 return data 288 289 290def merge_dats( 291 fpaths_in: list, 292 dir_out: str, 293 fname_out: str, 294 precision: str = "int16" 295 ): 296 """Merges all binary files fnames from the directory dir_in. 297 298 Returns nothing (void). 299 300 Parameters 301 ---------- 302 fpaths_in : list 303 The ordered list of binary file paths (names) we are merging. 304 305 dir_out : str 306 The directory we want to save the output to. 307 308 fname_out : str 309 The name of the output file we are saving in dir_out 310 (including the extension, e.g. '.bin' or '.dat') 311 312 precision : str (optional, defaults to "int16") 313 The precision of the data stored in our binary files e.g. "int16" 314 """ 315 316 assert os.path.exists(dir_out) 317 # Assert that all the binary files exist and have equal num of bytes 318 # Also, get the size of all the files, in bytes 319 size_in_bytes = _assert_all_files_same_size(fpaths_in) 320 fpath_out = os.path.join(dir_out,fname_out) 321 322 # Define loading parameters 323 n_files = len(fpaths_in) # Equal to number of channels in the output file 324 n_samples_per_chunk = MAX_SAMPLES_PER_CHUNK // n_files * n_files 325 bytes_per_sample = np.dtype(precision).itemsize 326 assert size_in_bytes % bytes_per_sample == 0 # Sanity check 327 n_samples = size_in_bytes // bytes_per_sample # Number of samples in each file 328 # n_chunks = num of full chunks we need to load (there will be a remainder) 329 chunk_size = MAX_SAMPLES_PER_CHUNK 330 n_chunks = n_samples // chunk_size 331 remainder_chunksize = n_samples % chunk_size # In n of samples 332 333 logger.info("Started merging files...") 334 with ExitStack() as stack, open(fpath_out,"wb") as f_out: 335 files = [stack.enter_context(open(fpath,"rb")) for fpath in fpaths_in] 336 337 d_buffer = np.zeros([chunk_size,n_files],dtype=precision) # data buffer, load into f_out 338 for _ in tqdm(range(n_chunks)): # tqdm is a progress bar 339 # Load a chunk from each of the files we are merging into memory 340 for idx,f in enumerate(files): 341 d_buffer[:,idx] = np.squeeze(_load_chunk(f,1,chunk_size,precision)) 342 # Combine the chunks and write them to file 343 f_out.write(bytearray(d_buffer.flatten().tobytes())) # TODO: make sure this is saving things at same precision 344 345 # Add the left over chunk 346 if remainder_chunksize: 347 d_buffer = np.zeros([remainder_chunksize,n_files],dtype=precision) 348 for idx,f in enumerate(files): 349 d_buffer[:,idx] = np.squeeze(_load_chunk(f,1,remainder_chunksize,precision)) 350 # Verify that we truely have reached the end of the file 351 assert not f.read(1), "Logic Error! Wrongly calculated file size." 352 # Combine the chunks and write them to file 353 f_out.write(bytearray(d_buffer.flatten().tobytes())) 354 logger.info("...Done merging files.") 355 return 356 357 358# Helper, make sure all files contain the same number of bytes 359def _assert_all_files_same_size(filepaths:list): 360 if len(filepaths) == 0: 361 logger.debug("Zero files provided") 362 return None 363 os.path.exists(filepaths[0]) 364 size = os.path.getsize(filepaths[0]) # they should all be same size 365 for fpath in filepaths[1:]: 366 assert os.path.exists(fpath) # Make sure fname exists 367 logger.debug(f"size: {size}") 368 logger.debug(f"getsize fpath: {os.path.getsize(fpath)}") 369 logger.debug(f"fpath = '{fpath}'") 370 assert size == os.path.getsize(fpath) 371 logger.debug(f"All {len(filepaths)} files passed are of size={size} bytes") 372 return size 373 374 375def _load_chunk( 376 file, 377 n_chan : int, 378 n_samples : int, 379 precision : type) -> np.ndarray: 380 """Helper. Loads a chunk of size (n_chan,n_samples) from buffered reader f. 381 382 This is really just a MatLab-esque wrapper for numpy.fromfile(). 383 384 Parameters 385 ---------- 386 `file : an io file buffered reader object` 387 The binary file that you are reading. The python built-in type 388 that you get from open(file_path , "rb") 389 390 `n_chan : int` 391 The number of channels. 392 393 `n_samples : int` 394 The number of units (measurements) in the sample 395 396 `precision : type (or a str representation of a valid type)` 397 The precision of the binary data, 398 e.g. numpy.int16 or "int16" are both valid 399 400 Returns 401 ------- 402 `numpy.ndarray` 403 2D array of shape (n_chan , n_samples) 404 If the binary file contains an ascending sequence [0,1,2,3,...] 405 then calling _load_chunk with n_chan = 2 and n_samples = 3 will 406 result in the following array [[0, 1], [2, 3], [4, 5]] 407 """ 408 d = np.fromfile( 409 file, 410 dtype = precision, 411 count = n_chan * n_samples).reshape((n_samples,n_chan)) 412 assert (n_samples,n_chan) == d.shape , f"Incompatible size (({n_samples},{n_chan}) == {d.shape})" 413 return d 414 415 416 417if __name__=="__main__": 418 # TESTS 419 import array 420 421 ### Test _load_chunk 422 logger.debug("Testing _load_chunk()...") 423 # Write a binary file 424 arrin = array.array("h" , np.arange(50)) 425 with open("temp_test.dat","wb") as file: 426 arrin.tofile(file) 427 # Read the binary file we just created with _load_chunk 428 with open("temp_test.dat","rb") as file: 429 arrout = _load_chunk(file, n_chan=2, n_samples=5, precision="int16") 430 # Assert that result must be equal to matlab version 431 assert (arrout == np.array([[0,1],[2,3],[4,5],[6,7],[8,9]])).all() 432 # Remove temp binary file 433 os.remove("temp_test.dat") 434 logger.debug("_load_chunk() passed all tests.") 435 436 437 ### Test load_binary 438 logger.debug("Testing load_binary()...") 439 # Write a binary file 440 arrin = array.array("h" , np.arange(50)) 441 with open("temp_test.dat","wb") as file: 442 arrin.tofile(file) 443 def load(fname="temp_test.dat" , **kwargs): # helper 444 with open(fname,"rb") as file: 445 arrout = load_binary(fname,**kwargs) 446 return arrout 447 arrout = load(n_chan=2) 448 assert (arrout == np.arange(50).reshape(25,2)).all() 449 arrout = load(n_chan=2,offset_size=2,duration_size=2) 450 assert (arrout == np.arange(4,8).reshape(2,2)).all() 451 arrout = load(n_chan=5,sample_rate=1,offset_time=5,duration_time=3) 452 assert (arrout == np.arange(25,40).reshape(3,5)).all() 453 # Remove temp binary file 454 os.remove("temp_test.dat") 455 logger.debug("load_binary() passed all tests.") 456 457 458 ### Test load_binary_multiple_segments 459 logger.debug("Testing load_binary_multiple_segments()...") 460 # Write a binary file 461 arrin = array.array("h", np.arange(500)) 462 with open("temp_test.dat","wb") as file: 463 arrin.tofile(file) 464 def load_segs(fname="temp_test.dat" , **kwargs): # helper 465 with open(fname,"rb") as file: 466 arrout = load_binary_multiple_segments(fname, **kwargs) 467 return arrout 468 arrout = load_segs(n_chan=2,sample_rate=2,offset_times=[10,20,30],duration_time=5,channels=[0,1],precision="int16") 469 assert (arrout.shape == np.array([3,10,2])).all() 470 assert (arrout[0,:,:] == np.arange(40,60).reshape(10,2)).all() 471 assert (arrout[2,:,:] == np.arange(120,140).reshape(10,2)).all() 472 # Same array load, therefore same tests, but with different args 473 arrout = load_segs(n_chan=2,sample_rate=2,offset_sizes=[20,40,60],duration_size=10,precision="int16") 474 assert (arrout.shape == np.array([3,10,2])).all() 475 assert (arrout[0,:,:] == np.arange(40,60).reshape(10,2)).all() 476 assert (arrout[2,:,:] == np.arange(120,140).reshape(10,2)).all() 477 logger.debug("load_binary_multiple_segments() all tests Passed") 478 479 480 ### Test merge_dats() 481 # by nature, merge_dats() contains some very dense code 482 logger.debug("Testing merge_dats()...") 483 # TEST 1 484 # Create some binary files 485 arr1 = np.asarray([0,2,4,6,8,10,12,14]) 486 arr2 = np.asarray([1,3,5,7,9,11,13,15]) 487 arr1.astype('int16').tofile("./arr1.dat") 488 arr2.astype('int16').tofile("./arr2.dat") 489 # Merge them 490 merge_dats( 491 fpaths_in=["./arr1.dat","./arr2.dat"], 492 dir_out="./", 493 fname_out="arrs1and2.dat", 494 precision="int16") 495 # Examine outfile 496 merged = np.fromfile("./arrs1and2.dat",dtype="int16") 497 expected_merged = np.arange(16,dtype="int16") 498 print(f"Merged dats: {merged}") 499 print(f"Expected: {expected_merged}") 500 for i,j in zip(merged,expected_merged): assert i==j 501 # TEST 2 502 # Create some binary files 503 arr1 = np.arange(0,2*MAX_SAMPLES_PER_CHUNK,2) % 32768 # max val int16 504 arr2 = np.arange(1,2*MAX_SAMPLES_PER_CHUNK,2) % 32768 # max val int16 505 arr1.astype('int16').tofile("./arr1.dat") 506 arr2.astype('int16').tofile("./arr2.dat") 507 # Merge them 508 merge_dats( 509 fpaths_in=["./arr1.dat","./arr2.dat"], 510 dir_out="./", 511 fname_out="arrs1and2.dat", 512 precision="int16") 513 merged = np.fromfile("./arrs1and2.dat",dtype="int16") 514 expected_merged = np.arange(0,2*MAX_SAMPLES_PER_CHUNK) % 32768 515 # Examine outfile 516 for i,j in zip(merged,expected_merged): assert i==j 517 # Clean up 518 os.remove("./arr1.dat") 519 os.remove("./arr2.dat") 520 os.remove("./arrs1and2.dat") 521 522 523 # TODO: Write test for _load_binary() 524 525
30def get_n_samples_from_dur_fs(dur,fs): 31 """Utility, get the number of samples in a time window.""" 32 return int(dur * fs + 0.5)
Utility, get the number of samples in a time window.
34def load_binary_multiple_segments( 35 file_path : str, 36 n_chan : int = 1, 37 sample_rate : int = None, 38 offset_times : list = [], 39 duration_time : float or None = None, 40 offset_sizes : list = [], 41 duration_size : int or None = None, 42 channels : list = [], 43 precision : type = "int16" 44 ) -> np.ndarray: 45 """Load many segments of data from multiplexed binary file. 46 47 Either provide a list of offset times and a duration time in seconds 48 XOR provide a list of offset sizes and a duration size for the segment 49 in number of samples. 50 51 Parameters 52 ---------- 53 `file_path : str` 54 Path to a .dat binary file 55 56 `n_chan : int` 57 Number of data channels in the file (defaults to 1) 58 59 `sample_rate : int or float` 60 Sample rate in Hz, (aka fs, frequency, sr is the MNE convention) 61 Defaults to None, if none, must specify offset_size and duration_size 62 63 `offset_times : list or np.ndarray` 64 Positions to start reading in seconds, (aka start_time), (defaults to empty) 65 66 `duration_time : float or None = None ` 67 Duration to read in seconds (per channel) (defaults to None) 68 69 `offset_sizes : list or np.ndarray` 70 Positions to start reading in num of samples, defaults to empty. 71 72 `duration_size : int or None` 73 Duration to read in number of samples (per channel) (defaults to None) 74 75 `channels : list ` 76 Indices of channels to read from, defaults to empty and uses all chs. 77 78 `precision : str` 79 Sample precision, defaults to 'int16'. 80 81 82 Returns 83 ------- 84 `numpy.ndarray` 85 A 3d array containg the segments' data, with shape 86 (n_segments , n_samples , n_binary_channels) 87 """ 88 # If required, convert time to n_samples (aka sizes) 89 if list(offset_times): # falsy 90 assert duration_time is not None, "Duration time must be specified" 91 assert duration_time > 0 , "Duration time must be specified" 92 assert not offset_sizes, "Cannot specify both times and sizes" 93 assert not duration_size, "Cannot specify both times and sizes" 94 offset_sizes = [get_n_samples_from_dur_fs(dt,sample_rate) for dt in offset_times] 95 duration_size = get_n_samples_from_dur_fs(duration_time,sample_rate) 96 assert list(offset_sizes) 97 assert duration_size > 0 98 if not channels: channels = [i for i in range(n_chan)] 99 # TODO: check whether they are integer values? Prob not necessary 100 # TODO: check channels are valid ints in valid range 101 102 n_segments = len(offset_sizes) # the number of segments 103 # Allocate space in memory 104 segments_data = np.zeros((n_segments, duration_size, len(channels)),dtype=precision) 105 for idx,offset_size in enumerate(offset_sizes): 106 segments_data[idx,:,:] = load_binary( 107 file_path, 108 n_chan, 109 sample_rate, 110 offset_size=offset_size, 111 duration_size=duration_size, 112 channels=channels, 113 precision=precision) 114 return segments_data
Load many segments of data from multiplexed binary file.
Either provide a list of offset times and a duration time in seconds XOR provide a list of offset sizes and a duration size for the segment in number of samples.
Parameters
file_path : str
Path to a .dat binary file
n_chan : int
Number of data channels in the file (defaults to 1)
sample_rate : int or float
Sample rate in Hz, (aka fs, frequency, sr is the MNE convention)
Defaults to None, if none, must specify offset_size and duration_size
offset_times : list or np.ndarray
Positions to start reading in seconds, (aka start_time), (defaults to empty)
duration_time : float or None = None
Duration to read in seconds (per channel) (defaults to None)
offset_sizes : list or np.ndarray
Positions to start reading in num of samples, defaults to empty.
duration_size : int or None
Duration to read in number of samples (per channel) (defaults to None)
channels : list
Indices of channels to read from, defaults to empty and uses all chs.
precision : str
Sample precision, defaults to 'int16'.
Returns
numpy.ndarray
A 3d array containg the segments' data, with shape
(n_segments , n_samples , n_binary_channels)
116def load_binary( 117 file_path : str, 118 n_chan : int = 1, 119 sample_rate : int = None, 120 offset_time : float = None, 121 duration_time : float = None, 122 offset_size : int = None, 123 duration_size : int = None, 124 channels : list = [], 125 precision : type = "int16") -> np.ndarray: 126 """Load data from a multiplexed binary file. 127 128 Reading a subset of data can be done in two different manners: 129 either by specifying start time ("offset_time") and duration ("duration_time") 130 (more intuitive), or by indicating the position ("offset_size") and size of 131 the subset in terms of number of samples per channel ("duration_size") 132 (more accurate). The function will raise an error if both 'time' and 'size' 133 arguments are provided, this is to avoid ambiguity. 134 135 Parameters 136 ---------- 137 `file_path : str` 138 Path to a .dat binary file 139 140 `n_chan : int` 141 Number of data channels in the file (defaults to 1) 142 143 `sample_rate : int or float` 144 Sample rate in Hz, (aka fs, frequency, sr is the MNE convention) 145 Defaults to None, if none, must specify offset_size and duration_size 146 147 `offset_time : int or float or None` 148 Position to start reading in seconds, (aka start_time) (defaults to None) 149 150 `duration_time : int or float or None` 151 Duration to read in seconds, (defaults to Inf) 152 153 `offset_size : int or None` 154 Position to start reading in samples (per channel) (defaults to None) 155 156 `duration_size : int or None` 157 Duration to read in number of samples (per channel) (defaults to None) 158 159 `channels : list or None` 160 Indices of channels to read from, defaults to None, if None uses all chs. 161 162 `precision : str` 163 Sample precision, defaults to 'int16'. 164 165 Returns 166 ------- 167 `numpy.ndarray` 168 A 2d array containg the specified segment's data. (1d if only one chan) 169 """ 170 # Checks to make sure the intput is correct 171 assert n_chan == int(n_chan) 172 assert n_chan >= 1 173 logger.debug(f"{n_chan} channel(s) in this binary file") 174 assert os.path.exists(file_path) , f"{file_path} appears not to exist." 175 if sample_rate is not None: assert sample_rate > 0 , f"Sample rate must be positive {sample_rate}" 176 if channels: 177 assert len(channels) <= n_chan , "Too many channels passed" 178 assert len(set(channels)) == len(channels) , "Repeating channels" 179 for chan in channels: 180 assert chan < n_chan and chan >= 0 , "Channel out of range" 181 assert int(chan) == chan , "Wrong type, must be int" 182 else: channels = [i for i in range(n_chan)] 183 184 # Either all four args are none -> read whole file xor: 185 # offset_time,duration_time xor offset_size,duration_size 186 # are both None (not just Falsy!) 187 if sample_rate == None: assert (offset_time,duration_time)==(None,)*2 188 if (offset_time,duration_time,offset_size,duration_size)==(None,)*4: 189 offset_size = 0 190 duration_size = np.inf 191 elif (offset_time,duration_time) == (None,)*2: 192 if offset_size == None: offset_size = 0 193 if duration_size == None: duration_size = np.inf 194 elif (offset_size,duration_size) == (None,)*2: 195 assert sample_rate 196 offset_size = 0 197 duration_size = np.inf 198 if offset_time: 199 offset_size = get_n_samples_from_dur_fs(offset_time,sample_rate) 200 if duration_time: 201 duration_size = get_n_samples_from_dur_fs(duration_time,sample_rate) 202 else: 203 raise Exception("Invalid Argument Combination!\nYou cannot specify both size-like and a time-like arguments for the duration and offset.") 204 assert offset_size >= 0 and int(offset_size) == offset_size , f"Bad offset {offset_size}" 205 assert duration_size > 0 , f"Non-positive duration size {duration_size}" 206 207 208 # Figure out what the data offset is in bytes 209 bytes_per_sample = np.dtype(precision).itemsize 210 fsize_bytes = os.path.getsize(file_path) # file size in num of bytes 211 fsize_samples = fsize_bytes // bytes_per_sample # file size in num of samples 212 assert fsize_bytes / bytes_per_sample == fsize_samples 213 fsize_samples_tail = fsize_samples - offset_size 214 215 # Make sure duration_size is compatible with file size and offset 216 if duration_size == np.inf: 217 logger.info("duration_size is np.inf") 218 duration_size = fsize_samples_tail // n_chan 219 assert fsize_samples_tail / n_chan == duration_size , f"Incompatability of parameters with shape of file. Either n_chan={nchan} is incorrect or your file {file_path} is corrupted." 220 else: 221 assert duration_size * n_chan <= fsize_samples_tail , f"Duration size ={duration_size} and offset={offset_size} exceed the end of the file {file_name}" 222 223 224 data_offset = offset_size * n_chan * bytes_per_sample 225 n_samples = duration_size # number of samples per channel 226 227 return _load_binary(file_path,n_chan,n_samples,precision,data_offset)[:,channels]
Load data from a multiplexed binary file.
Reading a subset of data can be done in two different manners: either by specifying start time ("offset_time") and duration ("duration_time") (more intuitive), or by indicating the position ("offset_size") and size of the subset in terms of number of samples per channel ("duration_size") (more accurate). The function will raise an error if both 'time' and 'size' arguments are provided, this is to avoid ambiguity.
Parameters
file_path : str
Path to a .dat binary file
n_chan : int
Number of data channels in the file (defaults to 1)
sample_rate : int or float
Sample rate in Hz, (aka fs, frequency, sr is the MNE convention)
Defaults to None, if none, must specify offset_size and duration_size
offset_time : int or float or None
Position to start reading in seconds, (aka start_time) (defaults to None)
duration_time : int or float or None
Duration to read in seconds, (defaults to Inf)
offset_size : int or None
Position to start reading in samples (per channel) (defaults to None)
duration_size : int or None
Duration to read in number of samples (per channel) (defaults to None)
channels : list or None
Indices of channels to read from, defaults to None, if None uses all chs.
precision : str
Sample precision, defaults to 'int16'.
Returns
numpy.ndarray
A 2d array containg the specified segment's data. (1d if only one chan)
291def merge_dats( 292 fpaths_in: list, 293 dir_out: str, 294 fname_out: str, 295 precision: str = "int16" 296 ): 297 """Merges all binary files fnames from the directory dir_in. 298 299 Returns nothing (void). 300 301 Parameters 302 ---------- 303 fpaths_in : list 304 The ordered list of binary file paths (names) we are merging. 305 306 dir_out : str 307 The directory we want to save the output to. 308 309 fname_out : str 310 The name of the output file we are saving in dir_out 311 (including the extension, e.g. '.bin' or '.dat') 312 313 precision : str (optional, defaults to "int16") 314 The precision of the data stored in our binary files e.g. "int16" 315 """ 316 317 assert os.path.exists(dir_out) 318 # Assert that all the binary files exist and have equal num of bytes 319 # Also, get the size of all the files, in bytes 320 size_in_bytes = _assert_all_files_same_size(fpaths_in) 321 fpath_out = os.path.join(dir_out,fname_out) 322 323 # Define loading parameters 324 n_files = len(fpaths_in) # Equal to number of channels in the output file 325 n_samples_per_chunk = MAX_SAMPLES_PER_CHUNK // n_files * n_files 326 bytes_per_sample = np.dtype(precision).itemsize 327 assert size_in_bytes % bytes_per_sample == 0 # Sanity check 328 n_samples = size_in_bytes // bytes_per_sample # Number of samples in each file 329 # n_chunks = num of full chunks we need to load (there will be a remainder) 330 chunk_size = MAX_SAMPLES_PER_CHUNK 331 n_chunks = n_samples // chunk_size 332 remainder_chunksize = n_samples % chunk_size # In n of samples 333 334 logger.info("Started merging files...") 335 with ExitStack() as stack, open(fpath_out,"wb") as f_out: 336 files = [stack.enter_context(open(fpath,"rb")) for fpath in fpaths_in] 337 338 d_buffer = np.zeros([chunk_size,n_files],dtype=precision) # data buffer, load into f_out 339 for _ in tqdm(range(n_chunks)): # tqdm is a progress bar 340 # Load a chunk from each of the files we are merging into memory 341 for idx,f in enumerate(files): 342 d_buffer[:,idx] = np.squeeze(_load_chunk(f,1,chunk_size,precision)) 343 # Combine the chunks and write them to file 344 f_out.write(bytearray(d_buffer.flatten().tobytes())) # TODO: make sure this is saving things at same precision 345 346 # Add the left over chunk 347 if remainder_chunksize: 348 d_buffer = np.zeros([remainder_chunksize,n_files],dtype=precision) 349 for idx,f in enumerate(files): 350 d_buffer[:,idx] = np.squeeze(_load_chunk(f,1,remainder_chunksize,precision)) 351 # Verify that we truely have reached the end of the file 352 assert not f.read(1), "Logic Error! Wrongly calculated file size." 353 # Combine the chunks and write them to file 354 f_out.write(bytearray(d_buffer.flatten().tobytes())) 355 logger.info("...Done merging files.") 356 return
Merges all binary files fnames from the directory dir_in.
Returns nothing (void).
Parameters
fpaths_in : list The ordered list of binary file paths (names) we are merging.
dir_out : str The directory we want to save the output to.
fname_out : str The name of the output file we are saving in dir_out (including the extension, e.g. '.bin' or '.dat')
precision : str (optional, defaults to "int16") The precision of the data stored in our binary files e.g. "int16"