diff --git a/fcsparser/api.py b/fcsparser/api.py index 7b9bbb6..bcabee7 100644 --- a/fcsparser/api.py +++ b/fcsparser/api.py @@ -1,21 +1,23 @@ #!/usr/bin/env python -# Eugene Yurtsev 07/20/2013 -# Distributed under the MIT License +""" +Distributed under the MIT License. -# Thanks to: -# - Ben Roth : adding a fix for Accuri C6 fcs files. +Thanks to: +- Ben Roth : adding a fix for Accuri C6 fcs files. -## -# Useful documentation for dtypes in numpy -# http://docs.scipy.org/doc/numpy/reference/generated/numpy.ndarray.byteswap.html?highlight=byteswap#numpy.ndarray.byteswap -# http://docs.scipy.org/doc/numpy/user/basics.types.html -# http://docs.scipy.org/doc/numpy/reference/arrays.dtypes.html +Useful documentation for dtypes in numpy +http://docs.scipy.org/doc/numpy/reference/generated/numpy.ndarray.byteswap.html?highlight=byteswap#numpy.ndarray.byteswap # noqa +http://docs.scipy.org/doc/numpy/user/basics.types.html +http://docs.scipy.org/doc/numpy/reference/arrays.dtypes.html +""" from __future__ import division +from io import BytesIO import string import sys import warnings -from io import BytesIO +import contextlib +import logging import numpy @@ -23,67 +25,67 @@ import pandas as pd except ImportError: pd = None - warnings.warn("pandas is not installed, so the parse_fcs function " - "can only be used together with numpy.") + warnings.warn(u'pandas is not installed, so the parse_fcs function can only be used together ' + u'with numpy.') + +logger = logging.getLogger(__name__) + + +def fromfile(file, dtype, count, *args, **kwargs): + """Wrapper around np.fromfile to support any file-like object.""" + try: + return numpy.fromfile(file, dtype=dtype, count=count, *args, **kwargs) + except (TypeError, IOError): + return numpy.frombuffer(file.read(count * numpy.dtype(dtype).itemsize), + dtype=dtype, count=count, *args, **kwargs) class ParserFeatureNotImplementedError(Exception): - """Raised when encountering fcs files with an unfamiliar feature""" + """Raise when encountering fcs files for which parsing hasn't been implemented.""" pass -# def raise_parser_feature_not_implemented(message): -# print(("Some of the parser features have not yet been implemented.\n" -# "If you would like to see this feature implemented, please send a sample FCS file\n" -# " to the developers.\n" -# "The following problem was encountered with your FCS file:\n" -# " {0} ").format(message)) -# raise NotImplementedError(message) +class FCSParser(object): + def __init__(self, path=None, read_data=True, channel_naming='$PnS'): + """Parse FCS files. + Compatible with most FCS 2.0, 3.0, 3.1 files. -class FCSParser(object): - """A Parser for .fcs files. - Should work for many FCS 2.0, 3.0, 3.1 + self.annotation: a dictionary holding the parsed content of the TEXT segment + In addition, a key called __header__ has been added to this dictionary + It specifies the information parsed from the FCS file HEADER segment. + (This won't be necessary for most users.) - self.annotation: a dictionary holding the parsed content of the TEXT segment - In addition, a key called __header__ has been added to this dictionary - It specifies the information parsed from the FCS file HEADER segment. - (This won't be necessary for most users.) + self.data holds the parsed DATA segment + self.analysis holds the ANALYSIS segment as read from the file. - self.data holds the parsed DATA segment - self.analysis holds the ANALYSIS segment as read from the file. + After the data segment is read: + self.channel_names holds the chosen names of the channels + self.channel_names_alternate holds the alternate names of the channels - After the data segment is read: - self.channel_names holds the chosen names of the channels - self.channel_names_alternate holds the alternate names of the channels - """ + Args: + path : str + Path of .fcs file + read_data : bool + If True, reads the data immediately. + Otherwise, use read_data method to read in the data from the fcs file. + channel_naming: '$PnS' | '$PnN' + Determines which meta data field is used for naming the channels. - def __init__(self, path=None, read_data=True, channel_naming='$PnS'): - """ - Parameters - ---------- - path : str - Path of .fcs file - read_data : bool - If True, reads the data immediately. - Otherwise, use read_data method to read in the data from the fcs file. - channel_naming: '$PnS' | '$PnN' - Determines which meta data field is used for naming the channels. - - The default should be $PnS (even though it is not guaranteed to be unique) - - $PnN stands for the short name (guaranteed to be unique). - Will look like 'FL1-H' - $PnS stands for the actual name (not guaranteed to be unique). - Will look like 'FSC-H' (Forward scatter) - - The chosen field will be used to population self.channels. - - If the chosen field does not exist in the fcs file. - The program attempts to use the alternative field by default. - - Note: These names are not flipped in the implementation. - It looks like they were swapped for some reason in the official FCS specification. + The default should be $PnS (even though it is not guaranteed to be unique) + + $PnN stands for the short name (guaranteed to be unique). + Will look like 'FL1-H' + $PnS stands for the actual name (not guaranteed to be unique). + Will look like 'FSC-H' (Forward scatter) + + The chosen field will be used to population self.channels. + + If the chosen field does not exist in the fcs file. + The program attempts to use the alternative field by default. + + Note: These names are not flipped in the implementation. + It looks like they were swapped for some reason in the official FCS specification. """ self._data = None self._channel_naming = channel_naming @@ -94,20 +96,22 @@ def __init__(self, path=None, read_data=True, channel_naming='$PnS'): self._data_start = -1 self._data_end = -1 self.channel_numbers = [] - self._analysis = '' + self._analysis = None self._file_size = 0 if channel_naming not in ('$PnN', '$PnS'): - raise ValueError("channel_naming must be either '$PnN' or '$PnS") + raise ValueError('channel_naming must be either "$PnN" or "$PnS"') self.annotation = {} self.path = path + if path: with open(path, 'rb') as f: self.load_file(f, read_data) def load_file(self, file_handle, read_data=True): + """""" file_handle.seek(0, 2) self._file_size = file_handle.tell() file_handle.seek(0) @@ -116,19 +120,29 @@ def load_file(self, file_handle, read_data=True): if read_data: self.read_data(file_handle) - @staticmethod - def from_data(data): - '''Loads an FCS file from a bytes-like object''' - file_handle = BytesIO(data) - obj = FCSParser() - obj.load_file(file_handle) + @classmethod + def from_data(cls, data): + """Load an FCS file from a bytes-like object. + + Args: + data: buffer containing contents of an FCS file. + + Returns: + FCSParser instance with data loaded + """ + obj = cls() + with contextlib.closing(BytesIO(data)) as file_handle: + obj.load_file(file_handle) return obj def read_header(self, file_handle): - """ - Reads the header of the FCS file. - The header specifies where the annotation, - data and analysis are located inside the binary file. + """Read the header of the FCS file. + + The header specifies where the annotation, data and analysis are located inside the binary + file. + + Args: + file_handle: buffer containing FCS file. """ header = {'FCS format': file_handle.read(6)} @@ -146,28 +160,28 @@ def read_header(self, file_handle): # Checking that the location of the TEXT segment is specified for k in ['text start', 'text end']: if header[k] == 0: - raise ValueError( - "The FCS file '{}' seems corrupted. (Parser cannot locate information " - "about the '{}' segment.)".format(self.path, k)) + raise ValueError('The FCS file "{}" seems corrupted. (Parser cannot locate ' + 'information about the "{}" segment.)'.format(self.path, k)) elif header[k] > self._file_size: - raise ValueError("The FCS file '{}' is corrupted. '{}' segment " - "is larger than file size".format(self.path, k)) + raise ValueError('The FCS file "{}" is corrupted. "{}" segment ' + 'is larger than file size'.format(self.path, k)) + else: + # All OK + pass self._data_start = header['data start'] self._data_end = header['data start'] if header['analysis start'] != 0: - warnings.warn( - "There appears to be some information in the ANALYSIS segment of file {0}. " - "However, it might not be read correctly.".format( - self.path)) + warnings.warn('There appears to be some information in the ANALYSIS segment of file ' + '{0}. However, it might not be read correctly.'.format(self.path)) self.annotation['__header__'] = header def read_text(self, file_handle): - """ - Reads the TEXT segment of the FCS file. - This is the meta data associated with the FCS file. + """Parse the TEXT segment of the FCS file. + + The TEXT segment contains meta data associated with the FCS file. Converting all meta keywords to lower case. """ header = self.annotation['__header__'] # For convenience @@ -180,10 +194,9 @@ def read_text(self, file_handle): try: raw_text = raw_text.decode('utf-8') except UnicodeDecodeError as e: - print("Encountered an illegal utf-8 byte in the header.\n" + - "Illegal utf-8 characters will be ignored.\n" + - "The illegal byte was {} at position {}".format( - repr(e.object[e.start]), e.start)) + logger.warning(u'Encountered an illegal utf-8 byte in the header.\n Illegal utf-8 ' + u'characters will be ignored.\n The illegal byte was {} at ' + u'position {}.'.format(repr(e.object[e.start]), e.start)) raw_text = raw_text.decode('utf-8', 'ignore') ##### @@ -193,9 +206,9 @@ def read_text(self, file_handle): if raw_text[-1] != delimiter: raw_text = raw_text.strip() if raw_text[-1] != delimiter: - msg = 'The first two characters were:\n {}. The last two characters were: {}\n' \ - 'Parser expects the same delimiter character in beginning ' \ - 'and end of TEXT segment'.format(repr(raw_text[:2]), (repr(raw_text[-2:]))) + msg = (u'The first two characters were:\n {}. The last two characters were: {}\n' + u'Parser expects the same delimiter character in beginning ' + u'and end of TEXT segment'.format(repr(raw_text[:2]), (repr(raw_text[-2:])))) raise ParserFeatureNotImplementedError(msg) # Below 1:-1 used to remove first and last characters which should be reserved for delimiter @@ -203,7 +216,7 @@ def read_text(self, file_handle): keys, values = raw_text_segments[0::2], raw_text_segments[1::2] text = {key: value for key, value in zip(keys, values)} # build dictionary - #### + ## # Extract channel names and convert some of the channel properties # and other fields into numeric data types (from string) # Note: do not use regular expressions for manipulations here. @@ -231,9 +244,6 @@ def read_text(self, file_handle): # Convert some of the fields into integer values keys_encoding_bits = ['$P{0}B'.format(i) for i in self.channel_numbers] - # unused currently but keep for debugging - # keys_encoding_range = ['$P{0}R'.format(i) for i in self.channel_numbers] - add_keys_to_convert_to_int = ['$NEXTDATA', '$PAR', '$TOT'] keys_to_convert_to_int = keys_encoding_bits + add_keys_to_convert_to_int @@ -251,18 +261,14 @@ def read_text(self, file_handle): if self._data_end == 0: self._data_end = int(text['$ENDDATA']) - ## - # Keep for debugging - # key_list = self.header['text'].keys() - # for key in sorted(text.keys()): - # print key, text[key] - # raise Exception('here') - def read_analysis(self, file_handle): - """ - Reads the ANALYSIS segment of the FCS file and stores it in self.analysis - Warning: This has never been tested with an actual - fcs file that contains an analysis segment. + """Read the ANALYSIS segment of the FCS file and store it in self.analysis. + + Warning: This has never been tested with an actual fcs file that contains an + analysis segment. + + Args: + file_handle: buffer containing FCS data """ start = self.annotation['__header__']['analysis start'] end = self.annotation['__header__']['analysis end'] @@ -270,12 +276,10 @@ def read_analysis(self, file_handle): file_handle.seek(start, 0) self._analysis = file_handle.read(end - start) else: - self._analysis = '' + self._analysis = None - def _check_assumptions(self): - """ - Checks the FCS file to make sure that some of the assumptions made by the parser are met. - """ + def _verify_assumptions(self): + """Verify that all assumptions made by the parser hold.""" text = self.annotation keys = text.keys() @@ -293,10 +297,7 @@ def _check_assumptions(self): '$BYTEORD {} not implemented'.format(text['$BYTEORD'])) def get_channel_names(self): - """ - Figures out which the channel names to use. - Raises a warning if the names are not unique. - """ + """Get list of channel names. Raises a warning if the names are not unique.""" names_s, names_n = self.channel_names_s, self.channel_names_n # Figure out which channel names to use @@ -309,13 +310,13 @@ def get_channel_names(self): channel_names = channel_names_alternate if len(set(channel_names)) != len(channel_names): - msg = ('The default channel names (defined by the {} ' + - 'parameter in the FCS file) were not unique. To avoid ' + - 'problems in downstream analysis, the channel names ' + - 'have been switched to the alternate channel names ' + - 'defined in the FCS file. To avoid ' + - 'seeing this warning message, explicitly instruct ' + - 'the FCS parser to use the alternate channel names by ' + + msg = ('The default channel names (defined by the {} ' + 'parameter in the FCS file) were not unique. To avoid ' + 'problems in downstream analysis, the channel names ' + 'have been switched to the alternate channel names ' + 'defined in the FCS file. To avoid ' + 'seeing this warning message, explicitly instruct ' + 'the FCS parser to use the alternate channel names by ' 'specifying the channel_naming parameter.') msg = msg.format(self._channel_naming) warnings.warn(msg) @@ -324,8 +325,8 @@ def get_channel_names(self): return channel_names def read_data(self, file_handle): - """ Reads the DATA segment of the FCS file. """ - self._check_assumptions() + """Read the DATA segment of the FCS file.""" + self._verify_assumptions() text = self.annotation if (self._data_start > self._file_size) or (self._data_end > self._file_size): @@ -348,21 +349,19 @@ def read_data(self, file_handle): conversion_dict = {'F': 'f', 'D': 'f', 'I': 'u'} if text['$DATATYPE'] not in conversion_dict.keys(): - raise ParserFeatureNotImplementedError( - '$DATATYPE = {0} is not yet supported.'.format(text['$DATATYPE'])) + raise ParserFeatureNotImplementedError('$DATATYPE = {0} is not yet ' + 'supported.'.format(text['$DATATYPE'])) # Calculations to figure out data types of each of parameters # $PnB specifies the number of bits reserved for a measurement of parameter n bytes_per_par_list = [int(text['$P{0}B'.format(i)] / 8) for i in self.channel_numbers] par_numeric_type_list = [ - '{endian}{type}{size}'.format(endian=endian, type=conversion_dict[text['$DATATYPE']], + '{endian}{type}{size}'.format(endian=endian, + type=conversion_dict[text['$DATATYPE']], size=bytes_per_par) - for bytes_per_par in bytes_per_par_list] - - # unused currently, but keep for debugging - # bytes_per_event = sum(bytes_per_par_list) - # total_bytes = bytes_per_event * num_events + for bytes_per_par in bytes_per_par_list + ] # Parser for list mode. Here, the order is a list of tuples. # Each tuple stores event related information @@ -383,7 +382,7 @@ def read_data(self, file_handle): data = data.reshape((num_events, num_pars)) ## # Convert to native byte order - # This is needed for working with pandas datastructures + # This is needed for working with pandas data structures native_code = '<' if (sys.byteorder == 'little') else '>' if endian != native_code: # swaps the actual bytes and also the endianness @@ -393,7 +392,7 @@ def read_data(self, file_handle): @property def data(self): - """ Holds the parsed DATA segment of the FCS file. """ + """Get parsed DATA segment of the FCS file.""" if self._data is None: with open(self.path, 'rb') as f: self.read_data(f) @@ -401,16 +400,17 @@ def data(self): @property def analysis(self): - """ Holds the parsed ANALYSIS segment of the FCS file. """ - if self._analysis == '': + """Get ANALYSIS segment of the FCS file.""" + if self._analysis is None: with open(self.path, 'rb') as f: self.read_analysis(f) return self._analysis def reformat_meta(self): - """ Collects the meta data information in a more user friendly format. - Function looks through the meta data, collecting the channel related information - into a dataframe and moving it into the _channels_ key + """Collect the meta data information in a more user friendly format. + + Function looks through the meta data, collecting the channel related information into a + dataframe and moving it into the _channels_ key. """ meta = self.annotation # For shorthand (passed by reference) channel_properties = [] @@ -444,28 +444,27 @@ def reformat_meta(self): meta['_channels_'] = df meta['_channel_names_'] = self.get_channel_names() - # Constructs Pandas dataframe + @property def dataframe(self): + """Construct Pandas dataframe.""" data = self.data channel_names = self.get_channel_names() return pd.DataFrame(data, columns=channel_names) def parse(path, meta_data_only=False, output_format='DataFrame', compensate=False, - channel_naming='$PnS', - reformat_meta=False): - """ - Parse an fcs file at the location specified by the path. + channel_naming='$PnS', reformat_meta=False): + """Parse an fcs file at the location specified by the path. Parameters ---------- - path : str + path: str Path of .fcs file - meta_data_only : bool + meta_data_only: bool If True, the parse_fcs only returns the meta_data (the TEXT segment of the FCS file) - output_format : 'DataFrame' | 'ndarray' + output_format: 'DataFrame' | 'ndarray' If set to 'DataFrame' the returned - channel_naming : '$PnS' | '$PnN' + channel_naming: '$PnS' | '$PnN' Determines which meta data field is used for naming the channels. The default should be $PnS (even though it is not guaranteed to be unique) @@ -478,15 +477,14 @@ def parse(path, meta_data_only=False, output_format='DataFrame', compensate=Fals Note: These names are not flipped in the implementation. It looks like they were swapped for some reason in the official FCS specification. - - reformat_meta : bool + reformat_meta: bool If true, the meta data is reformatted with the channel information organized into a DataFrame and moved into the '_channels_' key Returns ------- if meta_data_only is True: - meta_data : dict + meta_data: dict Contains a dictionary with the meta data information Otherwise: a 2-tuple with @@ -499,6 +497,7 @@ def parse(path, meta_data_only=False, output_format='DataFrame', compensate=Fals meta = parse_fcs(fname, meta_data_only=True) meta, data_pandas = parse_fcs(fname, meta_data_only=False, output_format='DataFrame') meta, data_numpy = parse_fcs(fname, meta_data_only=False, output_format='ndarray') + """ if compensate: raise ParserFeatureNotImplementedError("Compensation has not been implemented yet.") @@ -519,18 +518,9 @@ def parse(path, meta_data_only=False, output_format='DataFrame', compensate=Fals if meta_data_only: return meta elif output_format == 'DataFrame': - return meta, parsed_fcs.dataframe() + return meta, parsed_fcs.dataframe elif output_format == 'ndarray': # Constructs numpy matrix return meta, parsed_fcs.data else: raise ValueError("The output_format must be either 'ndarray' or 'DataFrame'") - - -def fromfile(file, dtype, count, *args, **kwargs): - """Wrapper around np.fromfile to support any file-like object""" - - try: - return numpy.fromfile(file, dtype=dtype, count=count, *args, **kwargs) - except (TypeError, IOError): - return numpy.frombuffer(file.read(count * numpy.dtype(dtype).itemsize), dtype=dtype, count=count, *args, **kwargs)