Example Plugin: Zoho CRM
In this example, we're going to walk through a plugin that loads records into Zoho CRM.
Outbound, it illustrates a record-style scenario, where the aim is to keep records in an external SaaS product in sync with a Snowflake table/view.
Similarly, inbound we are keeping warehouse tables in sync with changes being made to CRM records.
Zoho has a complex API Limits system, so we also get a chance to do a more advanced configuration of these in the plugin.
The full source code for the plugin is here, but I'm going to step through each section and explain what it does.
Please note, this example is illustrative only and uses a minimal amount of code. A production plugin would typically include a lot more error condition checking and logging statements.
Structure
After scrolling past the import statements, the first thing to notice is that the plugin is a single file containing a Python class that inherits from OmnataPlugin.
Manifest
Then we override the get_manifest
method and return information about the plugin.
The label determines what the app is named in the Omnata Sync UI:
We support the Create, Update, and Upsert strategies because those are the record import operations that Zoho's bulk write API supports. Since we capture the Zoho record ID during insert and update, it would also be possible to extend the plugin to support deletes via the 'mirror' strategy, but that's beyond the scope of this example.
We only support the "Send" outbound strategy because most people live in the moment in Slack. We could build a plugin that updates and deletes previous Slack messages after the original Snowflake record is updated or deleted, but that's probably overkill, right?
Connecting to the app
When a user selects an Plugin, the first thing we ask is that they connect to their instance of that application. So the connection_form
method is called so that we can render a form to display to the user.
In our case, the user will create a Server Based Application in the Zoho API console and provide the Client ID and Client Secret.
Here's the full function:
There's quite a lot going on here, mainly because we have to gather enough information to determine the rate limits, and also which data centre we're hosting (this determines the URLs we hit).
Notice too that our form was wrapped in a ConnectionMethod and we return a list of these. If the app has multiple ways of connecting (e.g. API keys, credentials, SSH keys), we can provide a list of options to the user.
After all connection parameters have been gathered, and any OAuth flows completed, the plugin will be asked which network addresses need to be permitted:
Notice that the network address varies based on which data center the user is in. Of course we could have just added all of the possible URLs, but it's a good practise to limit what's permitted.
The user will be shown these (just for their own peace of mind), and then a connection test will be performed by invoking the connect
function:
here, we're querying organisation information as a connection test. And because the 'zgid' is used by some other API requests, we stored it against the connection so that we don't have to keep fetching it each sync run.
Note that "access_token" is a special parameter created by the OAuth connection process (in other words, you access it like any other Omnata connection form value, instead of using the Snowflake secrets SDK.
Next, we see the API limits defined:
Here we are calculating the rate limits based on which Zoho licenses the account has, for the different endpoints we use (the api_credits
function contains this logic)
These rate limits will appear like this to the user, and they can be overridden:
More information on rate limit can be found at Rate Limiting.
Outbound Syncs
Configuration Form
Now that the connection to the app has been established and we know our source table, we can render the form that configures outbound syncs.
The Slack scenario is pretty straightforward, we need to know which channel to post into, and design a message template.
With the resulting form, we'll first ask which Zoho module the user wants to load records into. These are populated into a dropdown field via the fetch_module_form_options
function.
Additionally, if Upsert or Update is used as a strategy, the user must choose a unique field to match existing records on. Note that because it depends on the module
field, it won't be rendered until after that first selection is made, so the fetch_module_fields_form_option
function will be provided with that module value as a parameter.
Finally, we use a FormFieldMappingSelector to present the user with a mapping of columns to fields. The list of fields to map onto is sourced via the same fetch_module_fields_form_option
function from the find_by
dropdown.
Here's what the whole form looks like to the user, if they choose the Upsert strategy:
Sync the Records
Now for the exciting part!
Routinely, according to the schedule the user chooses, the sync_outbound
method on our plugin class will be called. It's the job of that method to load the records into the app, and report back on the outcome.
First, it's important to know that before this function is invoked, the plugin runtime has already modifed python's http
package to enforce the rate limits which have been set. This means that you don't need to worry about waiting in-between HTTP requests in your code.
In our function, we have:
Here we fetch the records into a batched dataframe, which is passed into the record_upload
function along with the channel name and the Slack webclient.
The record_upload
function is quite long, so I won't show the whole thing here, but we can discuss it.
The first thing to notice is that the method has the @managed_outbound_processing
decorator. This decorator is provided by the Omnata Plugin Runtime, and takes care of invoking the function in parallel (if needed), with the appropriate batch size.
In our case, we've set a concurrency
of 5 so that up to five requests can hit Zoho's APIs concurrently (still subject to the rate limits), and that the method is invoked with up to 25,000 records at a time in the dataframe, which is the maximum that the Zoho bulk APIs will accept.
The advantage here is twofold:
We don't have to split up the data ourselves
Working in batches batches is memory efficient. The plugin runtime will request more data from Snowflake only when needed, so you can process very large payloads with only an XS warehouse.
Inside the method, we:
Convert the dataframe's 'TRANSFORMED_RECORD' json column to CSV and zip it up
Upload the zip file to the Zoho content API
Create a Bulk Write job, pointing to the uploaded file
Wait until the job completes
Download the import report (a CSV file inside a zip file), parse it into a dataframe, and match the results back onto the original dataframe
The sync_outbound
method is responsible for returning a dataframe containing:
IDENTIFIER
- the identifier provided in the source dataframeAPP_IDENTIFIER
- the identifier from the remote system. A value is only necessary if you plan to update/delete using thisSUCCESS
- a boolean flag to indicate success or failure. In our Zoho plugin, we look for a STATUS csv column value of ADDED or UPDATED to indicate success.RESULT
- a JSON object containing further details if necessary, including an error messages
Normally you can use the enqueue_results
function on the outbound_sync_request
parameter to send these results back as you process them. Because we're doing our work inside the @managed_outbound_processing
-annotated method, we can just pass back a response for the records we're given on each iteration.
Inbound Syncs
Let's take a look at fetching data from Slack. In this example, we'll just support fetching a list of the channel names.
Inbound syncs work differently to outbound syncs, and typically their configuration is a lot simpler. Instead of describing how source records relate to entities/events in the target app, you're just fetching data from the app in whatever form it's in, to be stored in local tables in Snowflake.
Configuration Form
In this case, we don't need any initial information from the user:
Inbound Stream List
Every plugin which supports inbound syncs must also implement the inbound_stream_list
function:
Here we fetch a list of modules from the Zoho API, and add them all to the list of streams.
This will be presented to the user like so:
You might have noticed this section:
When the inbound_stream_list
function is invoked, initially the currently_selected_streams
list is None, which means we just want a full list of all possible streams. When this happens, it's OK to return a json_schema
of None, as it could be very slow and expensive to ask the system for every possible field for every single object.
However, subsequently (after the user has made stream selections), that list may contain the names of streams, and in that case you should return their schema.
Whenever a json_schema
is provided, it allows the Omnata engine to generate views over the raw JSON and extract the values, which is a nicer experience for the user.
Similarly to sync_outbound
, there's a function named sync_inbound
which is invoked whenever new records are to be read from the other app.
Here we're just passing the list of streams over to the fetch_records
function, which is decorated with managed_inbound_processing
.
This method will be invoked in parallel (up to 5 at a time), each invocation passing in a single stream to fetch.
The process uses the bulk read API in a similar fashion to the outbound sync (CSVs inside zip files), however after the data is fetched, we use enqueue_results
to provide the stream data back to the engine, along with the latest state information.
The state information (max_modified_time
in our case) isn't really necessary, but we'd need it if we were doing incremental loads.
Next Steps
Now that you've seen how a complete plugin can work, be sure to check out our tutorial on the Omnata Plugin Devkit. It'll show you how to upload a plugin, interactively develop in a notebook, and automate creation of test cases.
Last updated