#7

alternative interface to mappers/reducers

    • Status: Fixed
    • Priority: Normal (3)
    • Component: -
    • Estimate: None/Small/Medium/Large None
    I've written a patch with an alternative interface for the mapper and reducer functions. The Dumbo framework automatically detects which interface is being used by the function, and uses the function appropriately.

    The reason I'm doing it is so that I can integrate my existing Python ETL framework into Dumbo. The ETL framework is designed around a Filters/Pipes Pipeline model. The framework is designed so that the input filter of the pipeline must get an iterator for the input filter to iterate on; the framework is not designed to provide a function for something else to call. For instance, the framework has an etl.file.DelimitedInput input filter, which takes a file object as a parameter. The filter then iterates over the file. Another example is etl.db.Query input filter: it takes a db connection and sql text as parameters, and the filter iterates over the db cursor (obviously, this input filter would not be used in Hadoop, because I don't want any db connectivity on the Hadoop mapreduce nodes).

    So, the alternative interface for mappers and reducers is that it is passed an iterator as a parameter. Then, functions would be written like:

    def mapper(data):
        # Initialization goes here
        for key, value in data:
            for word in value.split(): 
                yield word,1
        # Cleanup goes here
    
    def reducer(data):
        # Initialization goes here
        for key, values in data:
            yield key, sum(values)
        # Cleanup goes here


    There are also some other advantages to this interface:

    • No need for mapclose/redclose options: you can define initialization and cleanup inside your mapper/reducer function.
    • Slight performance benefit: not calling a function for every row of input, and one less for-loop.

    Here's an an example of a portion of an ETL framework script:

    pl = Pipeline(
     DelimitedInput(sys.stdin, Schema(sf.replace(encoding_errors='replace') for sf in hub1_combined_page.schema),
      keep=keep, drop=drop, rename=rename)
     |
     etl.transform.TwosComplement('network_ip')
     |
     etl.transform.IPString('ip_address=client_ip_addr')
     |
     etl.transform.StringCleaner()
     |
     GenerateNetworkRev()
     |
     etl.db.DatabaseLoaderFile(sys.stdout, 'netezza', load_page_events.schema,
      schema_filter=lambda s: (sf.replace(encoding_errors='replace') for sf in s),
      insert_filters=extra_filters)
    ).do()


    This is doing:

    • DelimitedInput: Take a tab-delimited file, with the 113 fields described in hub1_combined_page.schema. If the input data does not match the defined types in the schema, convert to the schema's types, using a conversion cache on each column. Use the codecs module's "replace" error handler for unicode decoding errors. Before converting types of fields, keep a list of fields, drop certain fields, and rename certain fields.
    • TwosComplement: For the network_ip field, convert the unsigned 32-bit int field to signed 32-bit, so it'd fit in a 32-bit field in the database, since the database only supports signed values.
    • IPString: Convert an unsigned 32-bit int to dotted-quad string notation.
    • StringCleaner: For all fields in the schema which is of type unicode string, do unicode string cleaning, which includes things like transforming to Unicode Normalized Form C and dealing with unassigned codepoints, control characters, surrogate pairs, and Byte Order Marks.
    • GenerateNetworkRev: Add the network_rev field to the data.
    • DatabaseLoaderFile: Write out the data in netezza database bulk loader file format, using the table schema defined in load_page_events.schema.

    I see integrating it into Dumbo something like this:

    def mapper(data):
        pl = Pipeline(
         DumboInput(data, Schema(sf.replace(encoding_errors='replace') for sf in hub1_combined_page.schema),
          keep=keep, drop=drop, rename=rename)
         |
         etl.transform.TwosComplement('network_ip')
         |
         etl.transform.IPString('ip_address=client_ip_addr')
         |
         etl.transform.StringCleaner()
         |
         GenerateNetworkRev()
         |
         etl.db.DatabaseLoaderGenerator('netezza', load_page_events.schema,
          schema_filter=lambda s: (sf.replace(encoding_errors='replace') for sf in s))
         |
         DumboKeyGenerator('site_id')
        )
        pl.setup()
        return iter(pl)


    DumboKeyGenerator will extract site_id from the input row, assign that as the key, and yield key, row.

    I'll submit the patch in a few minutes.
  • Followers
     
    Ico-users Klaas Bosteels (Assigned To) , Daniel Lescohier 
     
    Attachments
    Fico_general
    2.13 KB Added by Daniel Lescohier on March 18, 2009 09:32 UTC   Details
    Fico_general
    699 Bytes Added by Daniel Lescohier on March 18, 2009 09:32 UTC   Details
    Associations
     
    No associations
    Activity
     
    User picture

          on Mar 18, 2009 @ 09:25am UTC * By Klaas Bosteels

    Sounds good. The performance benefit might not be (easily) noticeable in practice and initialization/cleanup can also be handled by using mapper/reducer classes and putting the initialization/cleanup logic in the constructor/destructor, but I can see some use in providing such a "low-level" mappers/reducers interface in addition to the ones we already have.
    User picture

          on Mar 18, 2009 @ 09:32am UTC * By Daniel Lescohier

    Attachment dumbo.patch added
    file:bFKpOge9Or3PXXeJe5afGb: Patch to core for adding new interface
    User picture

          on Mar 18, 2009 @ 09:32am UTC * By Daniel Lescohier

    Attachment wordcount_alt.py added
    file:bFTAjse9Or3PXXeJe5afGb: Additional example, using new interface
    User picture

          on Mar 18, 2009 @ 09:32am UTC * By Daniel Lescohier

    I didn't realize before I needed a github account, and they're taking forever to create the account for me, so I'm submitting as a patch file against commit 2cbfc1102546db2cd403b1b234367ca7b9c1de77.
    User picture

          on Mar 18, 2009 @ 10:04am UTC * By Klaas Bosteels

    Status changed from New to Fixed
    User picture

          on Mar 18, 2009 @ 10:06am UTC * By Klaas Bosteels

    Milestone set to 0.21.3
    User picture

          on Mar 18, 2009 @ 10:40am UTC * By Daniel Lescohier

    Klaas, I agree with you that the performance benefit would probably be insignificant. As you understood, that wasn't a motivation for me, it was to be able to integrate Dumbo with other frameworks. Thanks for applying the patch.

    I figured out the issue with github: apparently, it doesn't play nice with the Konqueror browser. I logged in successfully with Firefox, and now I have the fork button. So, in the future, I can submit patches properly.
    Time Expenditure
    Loading