Extremely Serious

Category: Apache Camel (Page 2 of 2)

Apache Camel Transformer

Apache Camel's Transformer EIP enables declarative, type-safe message conversion between POJOs in routes. Below is a fully self-contained, single-file example using nested classes and Camel's Main class.

Complete Single-File Example

import org.apache.camel.Message;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.main.Main;
import org.apache.camel.spi.DataType;
import org.apache.camel.spi.Transformer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Apache Camel Transformer EIP example demonstrating declarative type-safe message conversion.
 * All classes (Lead, Customer, LeadToCustomerTransformer) are in this single file.
 */
public class TransformerDemo {
    private static final Logger LOG = LoggerFactory.getLogger(TransformerDemo.class);

    /**
     * Lead POJO - Input model
     */
    public static class Lead {
        private String name;
        private String company;
        private String city;

        public Lead() {}

        public Lead(String name, String company, String city) {
            this.name = name;
            this.company = company;
            this.city = city;
        }

        // Getters and setters
        public String getName() { return name; }
        public void setName(String name) { this.name = name; }
        public String getCompany() { return company; }
        public void setCompany(String company) { this.company = company; }
        public String getCity() { return city; }
        public void setCity(String city) { this.city = city; }

        @Override
        public String toString() {
            return "Lead{name='" + name + "', company='" + company + "', city='" + city + "'}";
        }
    }

    /**
     * Customer POJO - Output model
     */
    public static class Customer {
        private String fullName;
        private String organization;
        private String location;

        public Customer() {}

        public Customer(String fullName, String organization, String location) {
            this.fullName = fullName;
            this.organization = organization;
            this.location = location;
        }

        // Getters and setters
        public String getFullName() { return fullName; }
        public void setFullName(String fullName) { this.fullName = fullName; }
        public String getOrganization() { return organization; }
        public void setOrganization(String organization) { this.organization = organization; }
        public String getLocation() { return location; }
        public void setLocation(String location) { this.location = location; }

        @Override
        public String toString() {
            return "Customer{fullName='" + fullName + "', organization='" + organization + "', location='" + location + "'}";
        }
    }

    /**
     * Transformer implementation for converting Lead to Customer
     */
    public static class LeadToCustomerTransformer extends Transformer {
        @Override
        public void transform(Message message, DataType fromType, DataType toType) throws Exception {
            Lead lead = message.getMandatoryBody(Lead.class);
            Customer customer = new Customer(
                lead.getName().toUpperCase(),
                lead.getCompany(),
                lead.getCity()
            );
            message.setBody(customer);
        }
    }

    public static void main(String[] args) throws Exception {
        Main main = new Main();
        main.configure().addRoutesBuilder(new RouteBuilder() {
            @Override
            public void configure() {
                // Register the transformer
                transformer()
                    .fromType("java:TransformerDemo$Lead")
                    .toType("java:TransformerDemo$Customer")
                    .withJava(LeadToCustomerTransformer.class);

                // Main route with automatic transformation
                from("direct:start")
                    .inputType("java:TransformerDemo$Lead")
                    .log("INPUT Lead: ${body}")
                    .to("direct:process")  // Triggers transformer automatically
                    .log("OUTPUT Customer: ${body}");

                from("direct:process")
                    .inputType("java:TransformerDemo$Customer")
                    .log("PROCESSED Customer: ${body}")
                    .to("log:final?showAll=true");
            }
        });

        // Add a startup listener to send test message after Camel starts
        main.addMainListener(new org.apache.camel.main.MainListenerSupport() {
            @Override
            public void afterStart(org.apache.camel.main.BaseMainSupport mainSupport) {
                try {
                    LOG.info("Sending test Lead message...");
                    mainSupport.getCamelContext().createProducerTemplate()
                        .sendBody("direct:start", new Lead("John Doe", "Acme Corp", "Auckland"));
                    LOG.info("Test message sent successfully!");
                } catch (Exception e) {
                    LOG.error("Failed to send test message", e);
                }
            }
        });

        // Run Camel (will run until Ctrl+C is pressed)
        main.run(args);
    }
}

Expected Console Output

Sending test Lead message...
INPUT Lead: Lead{name='John Doe', company='Acme Corp', city='Auckland'}
PROCESSED Customer: Customer{fullName='JOHN DOE', organization='Acme Corp', location='Auckland'}
Exchange[Id: D0B1A9A95984A67-0000000000000000, RouteGroup: null, RouteId: route2, ExchangePattern: InOnly, Properties: {CamelToEndpoint=log://final?showAll=true}, Headers: {}, BodyType: TransformerDemo.Customer, Body: Customer{fullName='JOHN DOE', organization='Acme Corp', location='Auckland'}]
OUTPUT Customer: Customer{fullName='JOHN DOE', organization='Acme Corp', location='Auckland'}
Test message sent successfully!

Key Implementation Details

Nested Class References: Uses TransformerDemo$Lead syntax to reference nested classes in data type strings.

Transformer Method Signature: Uses transform(Message message, DataType fromType, DataType toType)—the modern Camel 4.x+ API.

Automatic Startup Test: MainListenerSupport.afterStart() sends test message automatically when Camel context starts.

Single-File Design: All POJOs, transformer, and routes contained in one compilable .java file.

How the Transformation Triggers

  1. Input validationinputType("java:TransformerDemo$Lead") expects Lead
  2. Route boundaryto("direct:process") requires Customer input type
  3. Type mismatch → Camel matches Lead→Customer transformer automatically
  4. Conversion executesLeadToCustomerTransformer.transform() runs
  5. Type contract satisfied → Route continues with Customer

Apache Camel Consumer Types Explained

Apache Camel consumers initiate message processing in routes via the from() endpoint. They fall into two primary categories: event-driven and polling.

Core Classifications

Camel officially recognizes two consumer types—event-driven and polling—each suited to different message sources.

Type Description Mechanism
Event-Driven Reacts immediately to incoming events or invocations. No periodic checks. External push (e.g., JMS) or internal calls (e.g., direct). Uses transport-provided threads.
Polling Camel actively queries the endpoint at set intervals. Scheduled checks via ScheduledExecutorService. Configurable delay/initialDelay.

Event-Driven Consumers

These handle real-time messages without polling overhead. Examples include JMS (queue/topic events), HTTP/Netty (request triggers), and in-VM options like direct/SEDA.

  • Direct: Synchronous, blocking calls within the same CamelContext—like method invocation.
  • SEDA: Asynchronous queuing with backpressure control (bounded queue).

Example:

from("direct:start")  // Event-driven: waits for producerTemplate.sendBody()
    .log("Processed: ${body}");

Polling Consumers

Ideal for batch sources like files or databases. Camel polls periodically, ensuring ordered processing.

  • Uses fixed delays; supports repeat/repeatAttempt options.
  • Examples: file (watches directories), FTP/IMAP (remote checks).

Example:

from("file:input?delay=5000&noop=true")  // Polls every 5s
    .to("log:output");

Special Cases: In-VM Consumers

Direct and SEDA are event-driven subtypes for route chaining:

  • Direct: Sync, single-context.
  • SEDA: Async queue.

They differ from transport-based event-driven (e.g., JMS) by lacking external brokers but share passive reception.

When to Use Each

  • Event-Driven: Low-latency, continuous streams (pub/sub, APIs).
  • Polling: Reliable pulls from passive sources (files, legacy systems).

This covers Camel's consumer landscape for effective route design.

Apache Camel SEDA

Apache Camel SEDA implements the Staged Event-Driven Architecture pattern, enabling in-VM asynchronous messaging that decouples producers from consumers via BlockingQueue. This excels in high-load scenarios where synchronous endpoints like Direct would block threads—SEDA queues messages instead, boosting scalability with configurable concurrent consumers.

Core Advantages

  • Non-blocking Producers: Senders complete instantly while slow consumers process from the queue, preventing cascade failures.
  • Thread Pool Efficiency: Multiple consumers (concurrentConsumers=3) parallelize work without manual thread management.
  • Configurable Resilience: Options like queueSize, discardWhenFull, and offerTimeout handle overload gracefully.

Example

This standalone app uses Camel Main (Camel 4.x) with a custom ExchangeFormatter to visualize thread names, exchange IDs, and route context—clearly demonstrating SEDA's parallel consumer threads. Producers fire 5 messages rapidly (100ms intervals) into SEDA, while consumers lag with 1s delays; logs reveal immediate sends followed by staggered, multi-threaded processing.

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.spi.ExchangeFormatter;

/**
 * Apache Camel example demonstrating SEDA (Staged Event-Driven Architecture) pattern.
 * Shows asynchronous message processing with concurrent consumers and queuing.
 */
public class SedaExample {

    static void main(String[] args) throws Exception {
        // Create a new Camel Main instance (using fully qualified name to avoid conflict)
        org.apache.camel.main.Main main = new org.apache.camel.main.Main();

        // Add routes with SEDA processing
        main.configure().addRoutesBuilder(new RouteBuilder() {
            @Override
            public void configure() {
                // Create a custom ExchangeFormatter for detailed log output
                ExchangeFormatter customFormatter = exchange ->
                        String.format("[Thread: %s] Body: %s | ExchangeId: %s | RouteId: %s",
                                Thread.currentThread().getName(),
                                exchange.getIn().getBody(String.class),
                                exchange.getExchangeId(),
                                exchange.getFromRouteId());

                // Register the custom formatter in the Camel registry
                getContext().getRegistry().bind("customFormatter", customFormatter);

                // SEDA endpoint: queueId=myQueue, concurrent consumers for parallelism
                from("seda:myQueue?concurrentConsumers=3")
                    .log("Processing: ${body}")
                    .delay(1000)  // Simulate slow consumer (1s delay)
                    .to("log:output?showAll=true&exchangeFormatter=#customFormatter");

                // Producer route for demo
                from("timer:tick?repeatCount=5&delay=100")  // Fire 5 msgs quickly
                    .setBody().simple("Msg ${exchangeId}")
                    .log("Sending: ${body}")
                    .to("seda:myQueue");
            }
        });

        // Run Camel (will run until Ctrl+C is pressed)
        main.run(args);
    }
}

Running and Verification

Compile and run the Java class directly (requires Camel 4.14.0 on classpath). Sample output shows the advantage:

Sending: Msg 1CB44DE50955685-0000000000000000     // Producer thread - instant
Sending: Msg 1CB44DE50955685-0000000000000001     // Producer continues rapidly
output - [Thread: Camel (camel-1) thread #6 - Delay] Body: Msg 1CB44DE50955685-0000000000000000 | ExchangeId: 1CB44DE50955685-0000000000000002 | RouteId: route1
output - [Thread: Camel (camel-1) thread #7 - Delay] Body: Msg 1CB44DE50955685-0000000000000001 | ExchangeId: 1CB44DE50955685-0000000000000004 | RouteId: route1  // Parallel consumers

Contrast with direct:myQueue: sends block behind delays on single thread. SEDA's queue absorbs bursts across threads, perfect for enterprise workloads like order processing.

Apache Camel Bean EIP

Apache Camel's Bean Enterprise Integration Pattern (EIP) lets you invoke POJO methods directly within routes, seamlessly integrating custom business logic without heavy frameworks. This article presents a fully self-contained, runnable Java example using Camel Main, featuring header manipulation and enhanced logging for real-world demonstration. The code requires only camel-core on the classpath.

Key Features Demonstrated

  • Timer-triggered message flow every 2 seconds.
  • Bean method invocation with body and headers parameter binding.
  • Custom header enrichment (processor metadata, timestamps).
  • Split routes using direct for modularity.
  • Infinite runtime via Camel Main (Ctrl+C to stop).

The bean processes the message body, uppercases it, adds headers tracking processing details, and returns the transformed body.

Complete Runnable Code

Here's the entire application in a single file—compile and run directly:

import org.apache.camel.Body;
import org.apache.camel.Headers;
import org.apache.camel.builder.RouteBuilder;

import java.util.Map;

/**
 * Apache Camel example demonstrating bean method invocation in a route.
 * Uses a timer-based route that processes messages through a custom bean.
 */
public class BeanExampleApp {

    static void main(String[] args) throws Exception {
        // Create a new Camel Main instance (using fully qualified name to avoid conflict)
        org.apache.camel.main.Main main = new org.apache.camel.main.Main();

        // Add routes with bean processing
        main.configure().addRoutesBuilder(new RouteBuilder() {
            @Override
            public void configure() {
                // Route 1: Timer that fires every 2 seconds
                from("timer:beanTick?period=2000")
                        .setBody().constant("hello from timer")
                        .log("${body}")
                        .to("direct:process")
                        .log("${body}");

                // Route 2: Process the message using a bean
                from("direct:process")
                        .bean(new MyProcessor(), "process")  // Invoke bean method
                        .log("After bean: ${body}")
                        .log("Headers - ProcessedBy: ${header.ProcessedBy}, ProcessedAt: ${header.ProcessedAt}, OriginalBody: ${header.OriginalBody}");
            }
        });

        // Run Camel (will run until Ctrl+C is pressed)
        main.run(args);
    }

    /**
     * Custom processor bean that processes messages.
     * Accepts body as String and headers as Map for manipulation.
     */
    public static class MyProcessor {
        public String process(@Body String body, @Headers Map<String, Object> headers) {
            // Add custom headers
            headers.put("ProcessedBy", "MyProcessor");
            headers.put("ProcessedAt", System.currentTimeMillis());
            headers.put("OriginalBody", body);

            // Process and return the modified body
            return body.toUpperCase() + " - processed by MyProcessor";
        }
    }
}

Expected Output

Console logs repeat every 2 seconds:

hello from timer
After bean: HELLO FROM TIMER - processed by MyProcessor
Headers - ProcessedBy: MyProcessor, ProcessedAt: 1769775068470, OriginalBody: hello from timer
HELLO FROM TIMER - processed by MyProcessor

Best Practices Highlighted

  • Parameter Binding: Use annotations like @Body, @Headers, @Header("name") for type-safe access; raw types work for simple cases.

  • Method Selection: Explicitly name "process" method to avoid ambiguity with overloads.

  • Inline Beans: Perfect for simple processors; use registry lookup for complex or shared beans.

  • Route Modularity: direct: endpoints enable clean separation of concerns.

This pattern excels for transformations, validations, and custom logic in enterprise integration routes.

Newer posts »