One key factor in being able to translate the Apache Camel xml flows into Symfony services is the FlowsBuilder
compiler pass.
This is executed when the Symfony container is generated, the first time the application is executed.
The first thing that happens in this compiler pass is to get the list of the different ProcessorDefinition
the bundle supports.
A ProcessorDefintion
is a service that defines how a particular element in the Apache Camel xml flow needs to be parsed.
For instance a multicast
element in the xml flow will be parsed in a different way than a recipientList
element.
All these ProcessorDefintion
services are defined in CamelConfig, in the file Resources/config/services.yml
, and are tagged
with the name smartesb.definitions
, which is the tag name the compiler pass uses to find these services. See below some examples:
services:
smartesb.definitions.multicast:
class: Smartbox\Integration\CamelConfigBundle\ProcessorDefinitions\MulticastDefinition
tags:
- { name: smartesb.definitions, nodeName: multicast }
calls:
- [setProcessorClass, ["Smartbox\Integration\FrameworkBundle\Core\Processors\Routing\Multicast"]]
smartesb.definitions.pipeline:
class: Smartbox\Integration\CamelConfigBundle\ProcessorDefinitions\PipelineDefinition
tags:
- { name: smartesb.definitions, nodeName: pipeline }
calls:
- [setProcessorClass, ["Smartbox\Integration\FrameworkBundle\Core\Processors\Routing\Pipeline"]]
smartesb.definitions.recipient_list:
class: Smartbox\Integration\CamelConfigBundle\ProcessorDefinitions\RecipientListDefinition
tags:
- { name: smartesb.definitions, nodeName: recipientList }
calls:
- [setEvaluator, ["@smartesb.util.evaluator"]]
- [setProcessorClass, ["Smartbox\Integration\FrameworkBundle\Core\Processors\Routing\RecipientList"]]
Another important point, when these services are defined is the addition of the nodeName
. This is the name that the node has in the flow xml file,
as you can see in the following example for multicast
and pipeline
.
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:camel="http://camel.apache.org/schema/spring"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
<camelContext trace="false" xmlns="http://camel.apache.org/schema/spring">
<route>
<from uri="api://execute/skeleton/v0/broadcastping"/>
<multicast strategyRef="fireAndForget">
<pipeline>
<to uri="rest://remote_system_api/sendPingMessage">
<description>Sends a ping message to remote_system_api</description>
</to>
</pipeline>
</multicast>
</route>
</camelContext>
</beans>
With these ProcessorDefintion
the next stage is to register these in the processorDefintionRegistry
.
This service will be used in other steps of the complier pass to get references of these definitions.
Once all this has been done, the next stage is to get the directories and the version of the flows xml files that should
be loaded in the application. The flows_directories
and frozen_flows_directory
fields are defined in CamelConfig
config and the flows_version
is defined in FrameworkBundle config. To get these, the compiler pass is getting
the extension class of both bundles…
// This loads the class DependencyInjection/SmartboxIntegrationCamelConfigExtension.php from CamelConfig bundle
$extension = $container->getExtension('smartbox_integration_camel_config');
// This loads the class DependencyInjection/SmartboxIntegrationFrameworkExtension.php from Framework bundle
$frameworkExtension = $container->getExtension('smartbox_integration_framework');
… and from these 2 classes, the compiler pass will call different methods to get this information.
The third step in this process is to load the current version of the flows if the flow version setup doesn’t belong to any frozen flow version.
In this step, the first thing that happens is to get all the xml
filenames inside the flows_directories
path.
For each xml
file, the compiler pass will load the content of the xml
file and will try to build the flow
using Itineraries
and Processors
. In a flow xml file we can find 3 different elements:
multicast
, recipientList
, pipeline
, transformer
and so on. (Processor)This can be observed in the following example:
<from uri="api://execute/skeleton/v0/broadcastping"/> <!-- "from" uri node -->
<multicast strategyRef="fireAndForget"> <!-- "multicast" processor -->
<pipeline> <!-- "pipeline" processor -->
<to uri="rest://remote_system_api/sendPingMessage"> <!-- "to" uri node -->
<description>Sends a ping message to remote_system_api</description>
</to>
</pipeline>
</multicast>
Each flow will have a unique name inside the application. This name is based on the name of the flow file and the version of
the flow.
The way to connect all the xml nodes in a flow is through Itineraries
. An Itinerary
(full namespace
Smmartbox\Integration\FrameworkBundle\Core\Itinerary\Itinerary
) is a service where a list of processor ids are stored.
Each xml node has a processor id after the flow is built and the Itinerary
tells which is the next step to be executed
in a flow.
For each flow, a main Itinerary
is generated assigning the uri defined in the from
xml node.
If an xml node is referring to the to
uri node, then an EndpointProcessor
is built assigning the to
uri, generating and
and registering a processor id for that endpoint that will be added to the Itinerary
.
If an xml node is referring to something different than the from
or to
uri nodes, then a Processor
is built.
To determine which Processor
needs to be built, the compiler pass will use the processorDefintionRegistry
service mentioned
early to try to get the Definition
of that node. With that Definition
, the compiler pass knows how that xml node needs
to be parsed and configured, so that, at the end the Processor
can be built with a processor id that will be added to the
Itinerary
. For instance, if the xml node is defined as multicast
, then the compiler
pass will get the MulticastDefinition
and from there the MulticastProcessor
will be built.
The last step executed in the FlowsBuilder
compiler pass is to load the different frozen versions of the flows that are
defined in frozen_flows_directory
, the same way it loaded the current version of the flows.
If we take the example of the broadcastping
flow xml mentioned before in this documentation, at the end of the compiler pass
execution we will have something like the following in appProjectContainer.php
file /app/cache
folder
Associate the from
uri of the xml node to the main Itinerary
id
protected function getSmartesb_Map_ItinerariesService()
{
$this->services['smartesb.map.itineraries'] = $instance = new \Smartbox\Integration\FrameworkBundle\Configurability\Routing\ItinerariesMap();
$instance->addItinerary('v0-api://execute/skeleton/v0/broadcastping', '_sme_it_v0.v0_broadcast_ping'); // Main Itinerary id
return $instance;
}
Main Itinerary
created for that particular flow
protected function getSmeItV0_V0BroadcastPingService()
{
$this->services['_sme_it_v0.v0_broadcast_ping'] = $instance = new \Smartbox\Integration\FrameworkBundle\Core\Itinerary\Itinerary('v0_broadcast_ping'); // Unique flow name
$instance->id = '_sme_it_v0.v0_broadcast_ping';
$instance->addProcessorId('v0._sme_pr_broadcast_ping_1'); // Next step to be executed in the flow
return $instance;
}
First step in the Itinerary
, defined as next step in the main Itinerary
. In this case is to execute the Multicast
processor
protected function getV0_SmePrBroadcastPing1Service()
{
$this->services['v0._sme_pr_broadcast_ping_1'] = $instance = new \Smartbox\Integration\FrameworkBundle\Core\Processors\Routing\Multicast();
$instance->id = 'v0._sme_pr_broadcast_ping_1';
$instance->setEventDispatcher($this->get('event_dispatcher'));
$instance->setMessageFactory($this->get('smartesb.message_factory'));
$instance->setValidator($this->get('smartcore.validation.validator'));
$instance->setAggregationStrategy('fireAndForget');
$instance->addItinerary($this->get('_sme_it_v0.v0.010fa5ac574e2312fd000e5994a5e222b0fc23d1')); // New itinerary added for pipeline node
return $instance;
}
Sub Itinerary
defined related to the pipeline
node
protected function getSmeItV0_V0_010fa5ac574e2312fd000e5994a5e222b0fc23d1Service()
{
$this->services['_sme_it_v0.v0.010fa5ac574e2312fd000e5994a5e222b0fc23d1'] = $instance = new \Smartbox\Integration\FrameworkBundle\Core\Itinerary\Itinerary('v0.010fa5ac574e2312fd000e5994a5e222b0fc23d1');
$instance->id = '_sme_it_v0.v0.010fa5ac574e2312fd000e5994a5e222b0fc23d1';
$instance->addProcessorId('v0._sme_pr_broadcast_ping_2'); // Next step to be executed in the flow
return $instance;
}
Second step in the Itinerary
, defined as next step in the sub Itinerary
of the pipeline
node. In this
case is to execute the Pipeline
processor
protected function getV0_SmePrBroadcastPing2Service()
{
$this->services['v0._sme_pr_broadcast_ping_2'] = $instance = new \Smartbox\Integration\FrameworkBundle\Core\Processors\Routing\Pipeline();
$instance->id = 'v0._sme_pr_broadcast_ping_2';
$instance->setEventDispatcher($this->get('event_dispatcher'));
$instance->setMessageFactory($this->get('smartesb.message_factory'));
$instance->setValidator($this->get('smartcore.validation.validator'));
$instance->setItinerary($this->get('_sme_it_v0.v0.422cc2456d19812c769153fead550a26b388687a'));
return $instance;
}
Sub Itinerary
defined related to the to
node
protected function getSmeItV0_V0_422cc2456d19812c769153fead550a26b388687aService()
{
$this->services['_sme_it_v0.v0.422cc2456d19812c769153fead550a26b388687a'] = $instance = new \Smartbox\Integration\FrameworkBundle\Core\Itinerary\Itinerary('v0.422cc2456d19812c769153fead550a26b388687a');
$instance->id = '_sme_it_v0.v0.422cc2456d19812c769153fead550a26b388687a';
$instance->addProcessorId('v0._sme_pr_broadcast_ping_3'); // Next step to be executed in the flow
return $instance;
}
Third, and last step in the Itinerary
, defined as next step in the sub Itinerary
of the to
node.
In this case is to execute the Endpoint
processor
protected function getV0_SmePrBroadcastPing3Service()
{
$this->services['v0._sme_pr_broadcast_ping_3'] = $instance = new \Smartbox\Integration\FrameworkBundle\Core\Processors\EndpointProcessor();
$instance->id = 'v0._sme_pr_broadcast_ping_3';
$instance->setEventDispatcher($this->get('event_dispatcher'));
$instance->setMessageFactory($this->get('smartesb.message_factory'));
$instance->setValidator($this->get('smartcore.validation.validator'));
$instance->setEndpointFactory($this->get('smartesb.endpoint_factory'));
$instance->setURI('rest://remote_system_api/sendPingMessage');
$instance->setDescription('Sends a ping message to remote_system_api');
return $instance;
}