-
Followers
Klaas Bosteels (Assigned To) , Daniel Lescohier
Attachments
2.13 KB Added by Daniel Lescohier on March 18, 2009 09:32 UTC Details
699 Bytes Added by Daniel Lescohier on March 18, 2009 09:32 UTC Details AssociationsNo associationsActivityon Mar 18, 2009 @ 09:25am UTC * By Klaas Bosteels
Sounds good. The performance benefit might not be (easily) noticeable in practice and initialization/cleanup can also be handled by using mapper/reducer classes and putting the initialization/cleanup logic in the constructor/destructor, but I can see some use in providing such a "low-level" mappers/reducers interface in addition to the ones we already have.
on Mar 18, 2009 @ 09:32am UTC * By Daniel Lescohier
Attachment dumbo.patch addedfile:bFKpOge9Or3PXXeJe5afGb: Patch to core for adding new interface
on Mar 18, 2009 @ 09:32am UTC * By Daniel Lescohier
Attachment wordcount_alt.py addedfile:bFTAjse9Or3PXXeJe5afGb: Additional example, using new interface
on Mar 18, 2009 @ 09:32am UTC * By Daniel Lescohier
I didn't realize before I needed a github account, and they're taking forever to create the account for me, so I'm submitting as a patch file against commit 2cbfc1102546db2cd403b1b234367ca7b9c1de77.on Mar 18, 2009 @ 10:04am UTC * By Klaas Bosteels
Status changed from New to Fixedon Mar 18, 2009 @ 10:06am UTC * By Klaas Bosteels
Milestone set to 0.21.3
on Mar 18, 2009 @ 10:40am UTC * By Daniel Lescohier
Klaas, I agree with you that the performance benefit would probably be insignificant. As you understood, that wasn't a motivation for me, it was to be able to integrate Dumbo with other frameworks. Thanks for applying the patch.
I figured out the issue with github: apparently, it doesn't play nice with the Konqueror browser. I logged in successfully with Firefox, and now I have the fork button. So, in the future, I can submit patches properly.Time ExpenditureLoading
The reason I'm doing it is so that I can integrate my existing Python ETL framework into Dumbo. The ETL framework is designed around a Filters/Pipes Pipeline model. The framework is designed so that the input filter of the pipeline must get an iterator for the input filter to iterate on; the framework is not designed to provide a function for something else to call. For instance, the framework has an etl.file.DelimitedInput input filter, which takes a file object as a parameter. The filter then iterates over the file. Another example is etl.db.Query input filter: it takes a db connection and sql text as parameters, and the filter iterates over the db cursor (obviously, this input filter would not be used in Hadoop, because I don't want any db connectivity on the Hadoop mapreduce nodes).
So, the alternative interface for mappers and reducers is that it is passed an iterator as a parameter. Then, functions would be written like:
def mapper(data): # Initialization goes here for key, value in data: for word in value.split(): yield word,1 # Cleanup goes here def reducer(data): # Initialization goes here for key, values in data: yield key, sum(values) # Cleanup goes hereThere are also some other advantages to this interface:
Here's an an example of a portion of an ETL framework script:
pl = Pipeline( DelimitedInput(sys.stdin, Schema(sf.replace(encoding_errors='replace') for sf in hub1_combined_page.schema), keep=keep, drop=drop, rename=rename) | etl.transform.TwosComplement('network_ip') | etl.transform.IPString('ip_address=client_ip_addr') | etl.transform.StringCleaner() | GenerateNetworkRev() | etl.db.DatabaseLoaderFile(sys.stdout, 'netezza', load_page_events.schema, schema_filter=lambda s: (sf.replace(encoding_errors='replace') for sf in s), insert_filters=extra_filters) ).do()This is doing:
I see integrating it into Dumbo something like this:
def mapper(data): pl = Pipeline( DumboInput(data, Schema(sf.replace(encoding_errors='replace') for sf in hub1_combined_page.schema), keep=keep, drop=drop, rename=rename) | etl.transform.TwosComplement('network_ip') | etl.transform.IPString('ip_address=client_ip_addr') | etl.transform.StringCleaner() | GenerateNetworkRev() | etl.db.DatabaseLoaderGenerator('netezza', load_page_events.schema, schema_filter=lambda s: (sf.replace(encoding_errors='replace') for sf in s)) | DumboKeyGenerator('site_id') ) pl.setup() return iter(pl)DumboKeyGenerator will extract site_id from the input row, assign that as the key, and yield key, row.
I'll submit the patch in a few minutes.