Ron and Ella Wiki Page

Extremely Serious

Apache Camel Variables and VariableReceive in Java DSL

In Camel’s Java DSL, a variable is a key–value pair you associate with an Exchange, a route, or the CamelContext, and VariableReceive is a mode where incoming data is written into variables instead of the Message body and headers. Used properly, these give you a clean, explicit way to manage routing state without polluting payloads or headers.


1. Variables in Java DSL: the mental model

  • A variable is a named value stored on the exchange, route, or CamelContext, accessed with setVariable / getVariable or via EIPs.
  • Variables are separate from the message body, headers, and legacy exchange properties, and are meant as a scratchpad for state you need while routing.

Conceptually, you choose the scope based on who should see the value and how long it should live.


2. Local, global, and route‑scoped variables

2.1 Exchange‑local variable

Exchange‑local variables live only during processing of a single message.

from("direct:local-variable")
    .process(exchange -> {
        // store some state for this exchange only
        exchange.setVariable("customerId", 12345L);
    })
    .process(exchange -> {
        Long id = exchange.getVariable("customerId", Long.class);
        System.out.println("Customer ID = " + id);
    });

Rationale: use exchange‑local variables for state that is logically tied to a single message and must not leak to other messages or routes.


2.2 Global and route‑scoped variables

Global variables live in the CamelContext, and route‑scoped variables are global variables whose keys include a route identifier.

from("direct:global-and-route")
    .routeId("routeA")
    .process(exchange -> {
        CamelContext context = exchange.getContext();

        // global variable (shared across all routes)
        context.setVariable("globalFlag", true);

        // route‑scoped variable using naming convention
        exchange.setVariable("route:routeA:maxRetries", 3);
    })
    .process(exchange -> {
        CamelContext context = exchange.getContext();

        Boolean globalFlag =
            context.getVariable("globalFlag", Boolean.class);

        Integer maxRetries =
            context.getVariable("route:routeA:maxRetries", Integer.class);

        System.out.println("Global flag   = " + globalFlag);
        System.out.println("Max retries A = " + maxRetries);
    });

You can also read a global variable from an exchange with the global: prefix (if you prefer that naming scheme).

Rationale:

  • Global variables are convenient for application‑level flags or configuration computed at runtime.
  • Route‑scoped variables avoid collisions when different routes use similar semantic names like maxRetries or threshold.

3. SetVariable and SetVariables EIPs in Java DSL

While you can always call exchange.setVariable(...) directly, the Java DSL provides EIPs that make variable usage declarative in your routes.

3.1 Set a single variable

from("direct:set-variables")
    // single variable from constant
    .setVariable("status", constant("NEW"))

    // multiple variables in one go
    .setVariables(
        "randomNumber", simple("${random(1,100)}"),
        "copyOfBody",  body()
    )

    .process(exchange -> {
        String status  = exchange.getVariable("status", String.class);
        Integer random = exchange.getVariable("randomNumber", Integer.class);
        String bodyCopy = exchange.getVariable("copyOfBody", String.class);

        System.out.printf(
            "status=%s, random=%d, copy=%s%n",
            status, random, bodyCopy
        );
    });

Rationale: setVariable and setVariables keep the “I am defining variables here” logic inside the DSL layer rather than hiding it in processors, which makes routes easier to read and reason about.


4. VariableReceive on the consumer: fromV

VariableReceive changes how Camel receives data from endpoints: instead of populating the message body and headers, Camel stores them into variables.

Using it on the consumer side is done with fromV (the Java DSL variant of from):

fromV("direct:incoming", "originalBody")
    // body is empty here because the incoming payload is stored in variable "originalBody"
    .transform().simple("Processed: ${body}") // -> "Processed: "
    .process(exchange -> {
        String original =
            exchange.getVariable("originalBody", String.class);
        String processed =
            exchange.getMessage().getBody(String.class);

        System.out.println("Original  = " + original);
        System.out.println("Processed = " + processed);
    });

What actually happens:

  • On entry, Camel stores the incoming body in variable originalBody.
  • The Message body is empty (or at least not the original payload), so ${body} in the transform step produces "Processed: ".
  • Later, you can inspect or restore the original payload by reading the originalBody variable.

Rationale: this is useful when you want to separate transport reception from business payload, or when you want to ensure that early processors do not accidentally work on the raw incoming data.


5. VariableReceive on the producer: toV

You can apply VariableReceive to producer calls with toV, which works like to but puts the response into a variable instead of overwriting the message body.

from("direct:call-service")
    // main message body is here
    .toV("http://localhost:8080/service", null, "serviceReply")
    .process(exchange -> {
        // original body is preserved
        String original =
            exchange.getMessage().getBody(String.class);

        // HTTP response body from the service
        String reply =
            exchange.getVariable("serviceReply", String.class);

        // response header X-Level as header variable
        String level =
            exchange.getVariable("header:serviceReply.X-Level", String.class);

        String combined = "Original=" + original
                        + ", Reply=" + reply
                        + ", Level=" + level;

        exchange.getMessage().setBody(combined);
        System.out.println(combined);
    });

Semantics:

  • Request is sent to http://localhost:8080/service using the current body and headers.
  • Response body is placed into variable serviceReply.
  • Response headers are placed into variables named header:serviceReply.<HeaderName>.[camel.apache]
  • The exchange’s message body and headers after toV remain what they were before the call.

Rationale: this is an enrichment‑style pattern where you treat external responses as additional data points rather than as replacements for your main message.


6. Comparing body, headers, properties, and variables

For Java DSL routes you’ll typically use all four concepts; the key is to pick the right tool for the job.

Kind Scope / lifetime Java access (simplified) Typical use
Body Current message only exchange.getMessage().getBody() Business payload
Header Current message only exchange.getMessage().getHeader("X", T.class) Protocol / metadata (HTTP, JMS, etc.)
Property Current exchange (legacy pattern) exchange.getProperty("key", T.class) Cross‑processor flags, routing metadata
Variable Exchange / route / global repositories exchange.getVariable("key", T.class) or context API Extra state, external responses, config‑like

Rationale: using variables for scratchpad and enrichment state avoids overloading headers or properties with routing concerns, which improves maintainability and refactorability.


7. End‑to‑end Java program example (using only direct:)

This example shows how variables and toV/from can work together without any external HTTP endpoint. We simulate a “discount service” with another direct: route.

import org.apache.camel.CamelContext;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;

void main() throws Exception {
    CamelContext context = new DefaultCamelContext();

    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() {

            // 1) "Discount service" route, purely in-memory
            from("direct:discountService")
                .process(exchange -> {
                    String originalOrder =
                        exchange.getMessage().getBody(String.class);

                    // pretend we call an external system and compute a discount
                    String discountReply =
                        "discount=10% for order " + originalOrder;

                    exchange.getMessage().setBody(discountReply);
                });

            // 2) Main route using variables + VariableReceive with toV
            from("direct:start")
                // keep the original order body in a variable
                .setVariable("originalOrder", body())

                // call the in-memory discountService; response -> variable "discountReply"
                .toV("direct:discountService", null, "discountReply")

                // build combined response
                .process(exchange -> {
                    String originalOrder =
                        exchange.getVariable("originalOrder", String.class);
                    String discountReply =
                        exchange.getVariable("discountReply", String.class);

                    String result = "Order=" + originalOrder
                                  + ", DiscountService=" + discountReply;

                    exchange.getMessage().setBody(result);
                    System.out.println(result);
                });
        }
    });

    context.start();

    // Send a test order into the main route
    context.createProducerTemplate()
           .sendBody("direct:start", "{\"id\":1,\"total\":100}");

    Thread.sleep(2000);
    context.stop();
}
  • The direct:discountService route stands in for an external dependency but uses only the in‑memory direct component.
  • toV("direct:discountService", null, "discountReply") still demonstrates VariableReceive on a producer: the reply body goes into the discountReply variable; the main message body is preserved.
  • We then combine originalOrder and discountReply explicitly in the final processor, making the variable usage and flow of data very clear.

`onCompletion` and Unit of Work in Apache Camel (Java DSL)

In Java DSL, you declare routes in code, Camel wraps each Exchange in a Unit of Work, and onCompletion is the Java DSL hook that runs a sub‑route when that Unit of Work finishes.


Unit of Work: What Java DSL Developers Should Know

For each incoming Exchange, Camel creates a Unit of Work when it enters a route and tears it down when that route finishes processing the Exchange.

  • It represents “this message’s transaction/lifecycle through this route.”
  • It manages completion callbacks via Synchronization hooks (e.g. onComplete, onFailure).
  • In Java DSL, this is implicit; you usually just work with Exchange and let Camel manage the Unit of Work for you.

You can still access it if you need to register low‑level callbacks:

from("direct:withUoW")
    .process(exchange -> {
        var uow = exchange.getUnitOfWork();
        // uow.addSynchronization(...);
    })
    .to("mock:result");

Why this exists: Camel needs a clear lifecycle boundary to know when to commit transactions, acknowledge messages, and run “finished” logic once per Exchange.


Core onCompletion Concept in Java DSL

The onCompletion DSL lets you attach a small route that runs when the original Exchange’s Unit of Work completes.

  • Camel takes a copy of the Exchange and routes it through the onCompletion block.
  • This behaves “kinda like a Wire Tap”: the original thread can continue or finish while the completion route runs.

Basic Java DSL example:

from("direct:start")
    .onCompletion()
        .to("log:on-complete")
        .to("bean:auditBean")
    .end()
    .process(exchange -> {
        String body = exchange.getIn().getBody(String.class);
        exchange.getMessage().setBody("Processed: " + body);
    })
    .to("mock:result");

Why use it: It centralizes “after everything is done” logic (logging, metrics, notifications) instead of scattering it across processors and finally blocks.


Java DSL Scope: Global vs Route-Level

You can define onCompletion globally (context scope) or inside a specific route (route scope).

Global onCompletion (context scope)

Declared outside any from(...) in configure():

@Override
public void configure() {

    // Global onCompletion – applies to all routes unless overridden
    onCompletion()
        .onCompleteOnly()
        .to("log:global-complete");

    from("direct:one")
        .to("bean:logicOne");

    from("direct:two")
        .to("bean:logicTwo");
}
  • Runs for all routes in this RouteBuilder (and more generally in the context) when they complete successfully.

Route-level onCompletion

Declared inside a route:

@Override
public void configure() {

    // Global
    onCompletion()
        .onCompleteOnly()
        .to("log:global-complete");

    // Route-specific
    from("direct:start")
        .onCompletion().onFailureOnly()
            .to("log:route-failed")
        .end()
        .process(exchange -> {
            String body = exchange.getIn().getBody(String.class);
            if (body.contains("fail")) {
                throw new IllegalStateException("Forced failure");
            }
            exchange.getMessage().setBody("OK: " + body);
        })
        .to("mock:result");
}
  • Route-level onCompletion overrides global onCompletion for that route’s completion behavior.

Why the two levels: Global scope gives you cross‑cutting completion behavior (e.g. auditing for all routes), while route scope gives a particular route fine‑grained control.


Controlling When onCompletion Triggers (Java DSL)

The Java DSL offers fine control on when the completion logic is invoked.

Success vs failure

  • Always (default): no extra flags; runs on both success and failure.

  • Only on success:

    from("direct:successOnly")
      .onCompletion().onCompleteOnly()
          .to("log:success-only")
      .end()
      .to("bean:logic");
  • Only on failure:

    from("direct:failureOnly")
      .onCompletion().onFailureOnly()
          .to("log:failure-only")
      .end()
      .process(exchange -> {
          throw new RuntimeException("Boom");
      })
      .to("mock:neverReached");

Conditional with onWhen

from("direct:conditional")
    .onCompletion().onWhen(body().contains("Alert"))
        .to("log:alert-completion")
    .end()
    .to("bean:normalProcessing");
  • The completion route runs only when the predicate evaluates to true on the completion Exchange (e.g. body contains "Alert").

Why this is useful: It turns onCompletion into a small policy language: “When we’re done, if it failed do X; if it succeeded and condition Y holds, do Z.”


Before vs After Consumer (InOut) in Java DSL

For request–reply (InOut) routes, onCompletion can run either before or after the consumer writes the response back.

Default: after consumer (AfterConsumer)

from("direct:service")
    .onCompletion()
        .to("log:after-response")
    .end()
    .process(exchange -> {
        exchange.getMessage().setBody(
            "Response for " + exchange.getIn().getBody(String.class)
        );
    });
  • The caller receives the response; then the onCompletion sub‑route runs.

Before consumer: modeBeforeConsumer()

from("direct:serviceBefore")
    .onCompletion().modeBeforeConsumer()
        .setHeader("X-Processed-By", constant("MyService"))
        .to("log:before-response")
    .end()
    .process(exchange -> {
        exchange.getMessage().setBody(
            "Response for " + exchange.getIn().getBody(String.class)
        );
    });
  • modeBeforeConsumer() makes completion logic run before the consumer is done and before the response is written back, so you can still modify the outgoing message.

Why you’d choose one:

  • Use AfterConsumer when completion work is purely side‑effect (logging, metrics, notifications).
  • Use BeforeConsumer when completion should influence the final response (headers, correlation IDs, last‑minute logging of the exact response).

Synchronous vs Asynchronous onCompletion in Java DSL

Java DSL lets you decide whether completion runs sync or async.

Synchronous (default)

from("direct:sync")
    .onCompletion()
        .to("log:sync-completion")
    .end()
    .to("bean:work");
  • Completion runs on the same thread; no extra thread pool is used by default (from Camel 2.14+).

Asynchronous with parallelProcessing()

from("direct:async")
    .onCompletion().parallelProcessing()
        .to("log:async-completion")
        .to("bean:slowAudit")
    .end()
    .to("bean:fastBusiness");
  • parallelProcessing() tells Camel to run the completion task asynchronously using a thread pool, so the original thread can complete sooner.

Custom executor with executorServiceRef

@Override
public void configure() {

    // Assume "onCompletionPool" is registered in the CamelContext
    from("direct:customPool")
        .onCompletion()
            .parallelProcessing()
            .executorServiceRef("onCompletionPool")
            .to("bean:expensiveLogger")
        .end()
        .to("bean:mainLogic");
}

Why async: Completion tasks are often slow but non‑critical (e.g. writing to external systems), so running them on a separate thread avoids holding up the main request path.


How onCompletion and Unit of Work Fit Together (Java DSL View)

Bringing it together for Java DSL:

  • Each Exchange in a route is wrapped in a Unit of Work; that’s the lifecycle boundary.
  • At the end of that lifecycle, Camel fires completion hooks (Synchronization) and also activates any onCompletion routes defined in Java DSL.
  • onCompletion routes receive a copy of the Exchange and usually do not affect the already‑completed main route, especially when run async.

Apache Camel Route Templates with Java DSL

Apache Camel route templates in Java DSL let you define a reusable route “shape” and then create many concrete routes from it by supplying parameters. This avoids copy‑pasting similar routes and centralizes changes.


Core idea in Java DSL

In Java DSL, a route template is declared inside a RouteBuilder using routeTemplate("id") and a set of templateParameter(...) entries. Placeholders like {{name}} and {{period}} inside the route definition are filled in when you create concrete routes from the template.

Conceptually:
$$
\text{routeTemplate("id")} + \text{parameters} \Rightarrow \text{concrete route}
$$
This separates what changes (parameters) from what stays the same (the processing steps), so you can add or modify routes by changing parameters instead of duplicating code.


Defining a route template

You define the template in a RouteBuilder just like a normal route, but with routeTemplate and templateParameter.

import org.apache.camel.builder.RouteBuilder;

// Defines the route template using Java DSL
static class MyRouteTemplates extends RouteBuilder {
    @Override
    public void configure() throws Exception {
        routeTemplate("greetingTemplate")
                .templateParameter("name")
                .templateParameter("greeting")
                .templateParameter("period", "3s")

                .from("timer:{{name}}?period={{period}}")
                .setBody(simple("{{greeting}} from route {{name}} at ${date:now:HH:mm:ss}"))
                .log("${body}");
    }
}

Why it’s structured like this:

  • routeTemplate("greetingTemplate") marks this as a template (a blueprint), not a route that runs by itself.
  • Each templateParameter(...) declares one thing that can vary between route instances and can optionally have a default.
  • {{name}}, {{greeting}}, and {{period}} are placeholders; Camel will substitute them when you create real routes from this template.

All variable pieces are explicitly listed as parameters, so the template is easy to understand and validate.


Creating routes from the template with TemplatedRouteBuilder

To turn the template into real routes, you use TemplatedRouteBuilder with the CamelContext and supply parameter values.javadoc+1

import org.apache.camel.CamelContext;
import org.apache.camel.builder.TemplatedRouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;

void main() throws Exception {
    CamelContext context = new DefaultCamelContext();

    // Register template
    context.addRoutes(new MyRouteTemplates());

    // Create routes from template using TemplatedRouteBuilder
    TemplatedRouteBuilder.builder(context, "greetingTemplate")
            .routeId("helloFast")
            .parameter("name", "fast")
            .parameter("greeting", "Hello")
            .parameter("period", "2s")
            .add();

    TemplatedRouteBuilder.builder(context, "greetingTemplate")
            .routeId("helloSlow")
            .parameter("name", "slow")
            .parameter("greeting", "Kia ora")
            .parameter("period", "5s")
            .add();

    context.start();

    // Let timers fire for a while
    Thread.sleep(15000);

    context.stop();
}

What’s happening and why:

  • context.addRoutes(new MyRouteTemplates()) loads the template into the CamelContext, but does not yet create concrete routes.
  • Each TemplatedRouteBuilder.builder(context, "greetingTemplate") call:
    • Selects the template by id (greetingTemplate).
    • Assigns a routeId for the route instance.
    • Supplies parameter values (name, greeting, period).
    • Calls .add() to materialize and register a concrete route.

You end up with multiple active routes that share the same logic but differ only in their parameters.


Optional and negated parameters

Special placeholder variants help with optional and Boolean parameters inside templates:

  • {{?param}} → if param is set, include its value; if not, drop the option entirely from the URI.
  • {{!param}} → use the opposite Boolean value of param.

Example with clear naming:

routeTemplate("httpTemplate")
    .templateParameter("path")
    .templateParameter("replyTimeout")
    .templateParameter("disableLogging")

    .from("direct:start")
    .to("http://example.com/{{path}}"
        + "?replyTimeout={{?replyTimeout}}"
        + "&loggingEnabled={{!disableLogging}}");

Behavior and rationale:

  • If replyTimeout is not provided, the replyTimeout option is omitted from the URI entirely, so you avoid dangling replyTimeout=.
  • If disableLogging = true, then {{!disableLogging}} becomes false, so loggingEnabled=false on the endpoint.
  • If disableLogging = false, then loggingEnabled=true.

Use {{!param}} when the semantics of the parameter are intentionally opposite to the endpoint’s option name (e.g., disableX vs enabledX); when they are aligned, simply use {{param}}.

Producer and Consumer Templates in Apache Camel

Apache Camel provides templates so your Java code can easily send to and receive from endpoints without dealing with low‑level Exchange and Endpoint APIs.


What is a ProducerTemplate?

ProducerTemplate is a helper that lets your Java code send messages into Camel endpoints. It wraps the producer side of the Camel API behind simple methods so you can focus on business logic instead of message plumbing.

Key points:

  • Created from a CamelContext using context.createProducerTemplate().
  • Supports one‑way and request–reply interactions (send* vs request* methods).
  • Manages producers and thread usage internally, so you should create it once and reuse it.

The rationale is to decouple your application logic from route definitions: routes describe how data flows, while templates describe when your code chooses to send something into those flows.

ProducerTemplate Example

import org.apache.camel.CamelContext;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;

void main() throws Exception {
    CamelContext context = new DefaultCamelContext();

    // Define a simple echo route
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() {
            from("direct:echo")
                    .setBody(simple("Echo: ${body}"));
        }
    });

    context.start();

    // Create and reuse a ProducerTemplate
    ProducerTemplate producer = context.createProducerTemplate();

    // Fire-and-forget: send a message into the route
    producer.sendBody("direct:echo", "Hello (InOnly)");

    // Request-reply: get a response from the echo route
    String reply = producer.requestBody("direct:echo",
                                        "Hello (InOut)",
                                        String.class);

    System.out.println("Reply from direct:echo = " + reply);

    context.stop();
}

Why this structure:

  • The route from("direct:echo") declares the integration behavior once.
  • ProducerTemplate lets any Java code call that route as if it were a function, with a minimal API.
  • Reusing one template instance avoids creating many producers and threads, which is better for performance and resource usage.

What is a ConsumerTemplate?

ConsumerTemplate is the receiving counterpart: it lets Java code poll an endpoint and retrieve messages on demand. Instead of defining a continuously running from(...) route for everything, you can occasionally pull messages when your logic needs them.

Key points:

  • Created from a CamelContext using context.createConsumerTemplate().
  • Provides blocking (receiveBody), timed (receiveBody(uri, timeout)), and non‑blocking (receiveBodyNoWait) methods.
  • Returns null when a timed or non‑blocking call finds no message.

The rationale is to support imperative “give me at most one message now” patterns, which fit tests, command‑style tools, or workflows controlled by your own scheduler.

ConsumerTemplate Example

The following example uses both templates with the same seda:inbox endpoint, so you can see how sending and receiving fit together.

import org.apache.camel.CamelContext;
import org.apache.camel.ConsumerTemplate;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;

void main() throws Exception {
    CamelContext context = new DefaultCamelContext();

    // Route just logs anything arriving on seda:inbox
    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() {
            from("seda:inbox")
                    .log("Route saw: ${body}");
        }
    });

    context.start();

    // Create and reuse templates
    ProducerTemplate producer = context.createProducerTemplate();
    ConsumerTemplate consumer = context.createConsumerTemplate();

    // 1) Send a message into the SEDA endpoint
    producer.sendBody("seda:inbox", "Hello from ProducerTemplate");

    // 2) Poll the same endpoint using ConsumerTemplate (blocking receive)
    Object body = consumer.receiveBody("seda:inbox");
    System.out.println("ConsumerTemplate received = " + body);

    // 3) Timed receive: wait up to 2 seconds for another message
    Object maybeBody = consumer.receiveBody("seda:inbox", 2000);
    if (maybeBody == null) {
        System.out.println("No more messages within 2 seconds");
    }

    context.stop();
}

Why this structure:

  • Both templates share the same context, so they see the same routes and endpoints.
  • ProducerTemplate pushes a message to seda:inbox, and ConsumerTemplate pulls from that same seda:inbox, clearly demonstrating their complementary roles.
  • The timed receive shows how you can avoid blocking forever, which is important when you control thread lifecycles yourself.

FluentProducerTemplate: an optional fluent variant

FluentProducerTemplate is a fluent wrapper around the producer concept, giving a builder‑like syntax for setting body, headers, and endpoint

Example:

import org.apache.camel.CamelContext;
import org.apache.camel.FluentProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;

void main() throws Exception {
    CamelContext context = new DefaultCamelContext();

    context.addRoutes(new RouteBuilder() {
        @Override
        public void configure() {
            from("direct:greet")
                    .setBody(simple("Hi ${header.name}, body was: ${body}"));
        }
    });

    context.start();

    FluentProducerTemplate fluent = context.createFluentProducerTemplate();

    String result = fluent
            .to("direct:greet")
            .withHeader("name", "Alice")
            .withBody("Sample body")
            .request(String.class);

    System.out.println("Fluent reply = " + result);

    context.stop();
}

Why this exists:

  • It makes the send configuration self‑describing at the call site (endpoint, headers, body are all visible in one fluent chain).
  • It is especially handy when constructing slightly different calls to the same route, since you can reuse the template and vary the fluent chain arguments.

Custom Log Masking in Apache Camel

Apache Camel's built-in masking can sometimes truncate after the first match. This article shows how to implement a custom MaskingFormatter that processes entire strings and masks all sensitive fields in JSON, query strings, and mixed formats without truncation.

Why Custom Masking?

The custom formatter:

  • Uses SensitiveUtils.getSensitiveKeys() (~100 built-in keywords like password, secret, token)
  • Adds your custom keywords (userId, ssn, creditCard)
  • Handles both JSON ("key": "value") and query strings (key=value)
  • Uses Matcher.appendReplacement() loop to mask every occurrence
  • Never truncates - processes complete input

Example

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;
import org.apache.camel.spi.MaskingFormatter;
import org.apache.camel.support.SimpleRegistry;
import org.apache.camel.util.SensitiveUtils;
import java.util.regex.Matcher;
import java.util.regex.Pattern;

/**
 * Custom formatter that masks ALL sensitive fields without truncating.
 * Processes entire string and masks all occurrences of sensitive keywords.
 */
static class NonTruncatingMaskingFormatter implements MaskingFormatter {
    private final Pattern sensitivePattern;

    public NonTruncatingMaskingFormatter() {
        // Use SensitiveUtils for ~100 built-in keywords + custom ones
        var sensitiveKeys = new java.util.HashSet<>(SensitiveUtils.getSensitiveKeys());
        sensitiveKeys.add("userId");
        sensitiveKeys.add("ssn");
        sensitiveKeys.add("creditCard");

        String keywordGroup = String.join("|", sensitiveKeys);

        // Matches JSON: "keyword": "value" AND query: keyword=value
        this.sensitivePattern = Pattern.compile(
                "(?i)(?:([\"'])(" + keywordGroup + ")\\1\\s*:\\s*[\"']([^\"']+)[\"']" +  // JSON
                        "|(" + keywordGroup + ")\\s*=\\s*([^&\\s,}]+))",  // Query string
                Pattern.CASE_INSENSITIVE
        );
    }

    @Override
    public String format(String source) {
        if (source == null || source.isEmpty()) {
            return source;
        }

        Matcher matcher = sensitivePattern.matcher(source);
        StringBuilder result = new StringBuilder();

        while (matcher.find()) {
            String replacement;
            if (matcher.group(1) != null) {
                // JSON format: "key": "value" -> "key": "xxxxx"
                String quote = matcher.group(1);
                String keyword = matcher.group(2);
                replacement = quote + keyword + quote + ": \"xxxxx\"";
            } else {
                // Query string: key=value -> key=xxxxx
                String keyword = matcher.group(4);
                replacement = keyword + "=xxxxx";
            }
            matcher.appendReplacement(result, Matcher.quoteReplacement(replacement));
        }
        matcher.appendTail(result);

        return result.toString();
    }
}

static class MyRoutes extends RouteBuilder {
    @Override
    public void configure() {
        from("direct:test")
                .to("log:maskedLogger?showAll=true&multiline=true")
                .log("*** FULL BODY (custom masking): ${body}");
    }
}

void main() throws Exception {
    SimpleRegistry registry = new SimpleRegistry();

    // Register custom formatter
    NonTruncatingMaskingFormatter formatter = new NonTruncatingMaskingFormatter();
    registry.bind(MaskingFormatter.CUSTOM_LOG_MASK_REF, formatter);

    DefaultCamelContext context = new DefaultCamelContext(registry);
    context.setLogMask(true);  // Enable globally
    context.addRoutes(new MyRoutes());
    context.start();

    // Query string payload - ALL fields masked
    String queryPayload = "userId=12345&password=pass123&apiKey=abc-xyz-123&ssn=123-45-6789&creditCard=4111-1111-1111-1111&token=jwt.abc.def&anotherPassword=secret456";
    context.createProducerTemplate().sendBody("direct:test", queryPayload);

    // Complex nested JSON - ALL fields masked
    String jsonPayload = """
        {
            "userId": "user123",
            "username": "john.doe",
            "password": "secretPass456",
            "email": "john@example.com",
            "apiKey": "sk-live-abc123xyz",
            "ssn": "987-65-4321",
            "creditCard": "5555-4444-3333-2222",
            "token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9",
            "profile": {
                "accessToken": "ghp_xxxxxxxxxxxx",
                "refreshToken": "refresh_abc123"
            }
        }
        """;
    context.createProducerTemplate().sendBody("direct:test", jsonPayload);

    Thread.sleep(2000);
    context.stop();
}

Expected Output

Query string (ALL 7 fields masked):

*** FULL BODY (custom masking): userId=xxxxx&password=xxxxx&apiKey=xxxxx&ssn=xxxxx&creditCard=xxxxx&token=xxxxx&anotherPassword=xxxxx

JSON (ALL 10+ fields masked):

*** FULL BODY (custom masking): {
    "userId": "xxxxx",
    "username": "xxxxx",
    "password": "xxxxx",
    "email": "john@example.com",
    "apiKey": "xxxxx",
    "ssn": "xxxxx",
    "creditCard": "xxxxx",
    "token": "xxxxx",
    "profile": {
        "accessToken": "xxxxx",
        "refreshToken": "xxxxx"
    }
}

Key Advantages

Feature Built-in Custom
Truncation ❌ Sometimes ✅ Never
Custom keywords Limited ✅ Full control
Nested objects ❌ Limited ✅ Complete scan

Usage Patterns

Route-level:

from("direct:secure")
    .logMask()  // Uses your custom formatter
    .log("${body}");

Endpoint-level:

.to("log:secure?logMask=true")

Global: context.setLogMask(true) (as shown)

Apache Camel Error Handling: DefaultErrorHandler and onException

Apache Camel's robust error handling ensures reliable integration routes even when failures occur. The DefaultErrorHandler provides automatic retries and logging, while onException clauses offer precise control over specific exceptions.

Core Concepts

DefaultErrorHandler is Camel’s default strategy for all routes, featuring retry logic, detailed logging, and flexible recovery paths. It logs exceptions at ERROR level by default and supports configurable redeliveries without requiring dead letter channels.

onException clauses intercept specific exception types globally or per-route, with priority based on exception specificity—most-derived classes match first. Use handled(true) to stop propagation and transform responses, or continued(true) to resume the route.

DefaultErrorHandler Configuration

Set global retry policies early in RouteBuilder’s configure() method:

errorHandler(defaultErrorHandler()
    .maximumRedeliveries(3)
    .redeliveryDelay(2000)
    .retryAttemptedLogLevel(LoggingLevel.WARN)
    .logStackTrace(true));

Key options control retry behavior: maximumRedeliveries caps attempts, redeliveryDelay sets wait time (with exponential backoff), and retryAttemptedLogLevel adjusts retry logging verbosity. Per-route overrides nest within from() endpoints for granular control.

onException Best Practices

Place onException handlers before route definitions for global scope. Multiple handlers for the same exception type chain by priority, with conditions via onWhen():

onException(IOException.class)
    .handled(true)
    .log("🚨 IO EXCEPTION: ${exception.message}")
    .to("direct:ioErrorHandler");

onException(IllegalArgumentException.class)
    .onWhen(exchange -> "high".equals(exchange.getIn().getHeader("error.priority")))
    .handled(true)
    .log("⚠️ HIGH PRIORITY: ${exception.message}");

The handled(true) flag clears the exception and prevents DefaultErrorHandler retries unless continued(true) is specified. Route to dedicated error flows like direct:ioErrorHandler for consistent processing.

Example

This self-contained demo showcases layered error handling across exception types, conditional logic, and retry policies:

import org.apache.camel.*;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.impl.DefaultCamelContext;

public class CamelErrorHandlingDemo extends RouteBuilder {

    @Override
    public void configure() throws Exception {
        // Global DefaultErrorHandler with retries
        errorHandler(defaultErrorHandler()
            .maximumRedeliveries(3)
            .redeliveryDelay(2000)
            .retryAttemptedLogLevel(LoggingLevel.WARN)
            .logStackTrace(true));

        // Specific exception handlers (priority: most specific first)
        onException(java.io.IOException.class)
            .handled(true)
            .log("🚨 IO EXCEPTION HANDLED FIRST: ${exception.message}")
            .to("direct:ioErrorHandler");

        onException(IllegalArgumentException.class)
            .maximumRedeliveries(2)
            .redeliveryDelay(1000)
            .onWhen(exchange -> "high".equals(exchange.getIn().getHeader("error.priority")))
            .handled(true)
            .log("⚠️ HIGH PRIORITY ARG EXCEPTION: ${exception.message}")
            .to("direct:argErrorHandler");

        onException(IllegalArgumentException.class)
            .handled(true)
            .log("⚠️ DEFAULT ARG EXCEPTION: ${exception.message}")
            .to("direct:argErrorHandler");

        onException(Exception.class)
            .handled(true)
            .log("💀 GENERIC EXCEPTION: ${exception.message}")
            .to("direct:deadLetterQueue");

        // Main processing route
        from("direct:start")
            .routeId("mainRoute")
            .log("Processing: ${body}")
            .process(mainProcessor())
            .to("direct:success");

        from("direct:success").log("✅ SUCCESS: ${body}");
        from("direct:ioErrorHandler").log("📁 IO Error handled").setBody(constant("IO_PROCESSED"));
        from("direct:argErrorHandler").log("🔄 Arg error processed").setBody(constant("ARG_PROCESSED"));
        from("direct:deadLetterQueue").log("⚰️ DLQ: ${exception.stacktrace}").setBody(constant("DLQ_PROCESSED"));
    }

    private static Processor mainProcessor() {
        return exchange -> {
            String input = exchange.getIn().getBody(String.class);
            switch (input != null ? input : "") {
                case "io-fail":
                    exchange.getIn().setHeader("error.priority", "high");
                    throw new java.io.IOException("💾 Disk full");
                case "arg-high":
                    exchange.getIn().setHeader("error.priority", "high");
                    throw new IllegalArgumentException("❌ High priority invalid data");
                case "arg-normal":
                    throw new IllegalArgumentException("❌ Normal invalid data");
                case "generic-fail":
                    throw new RuntimeException("Unknown error");
                default:
                    exchange.getIn().setBody("✅ Processed: " + input);
            }
        };
    }

    public static void main(String[] args) throws Exception {
        try (CamelContext context = new DefaultCamelContext()) {
            context.getCamelContextExtension().setName("CamelErrorDemo");
            context.addRoutes(new CamelErrorHandlingDemo());
            context.start();

            ProducerTemplate template = context.createProducerTemplate();
            System.out.println("🚀 Camel Error Handling Demo...\n");

            template.sendBody("direct:start", "io-fail");      // → IO handler
            template.sendBody("direct:start", "arg-high");     // → High priority arg
            template.sendBody("direct:start", "arg-normal");   // → Default arg  
            template.sendBody("direct:start", "generic-fail"); // → Generic handler
            template.sendBody("direct:start", "good");         // → Success

            Thread.sleep(8000);
        }
    }
}

Expected Demo Output

textProcessing: io-fail
🚨 IO EXCEPTION HANDLED FIRST: 💾 Disk full
📁 IO Error handled

Processing: arg-high  
⚠️ HIGH PRIORITY ARG EXCEPTION: ❌ High priority invalid data
🔄 Arg error processed

Production Tips

Test all failure paths during development using predictable payloads like the demo. Monitor exchange.getProperty(Exchange.REDELIVERY_COUNTER) for retry stats. For complex flows, combine with doTry().doCatch() blocks for local exception handling within routes. This approach scales from simple REST APIs to enterprise integration patterns.

Dead Letter Channel in Apache Camel with Log Endpoint

Apache Camel's Dead Letter Channel (DLC) is an Enterprise Integration Pattern that gracefully handles failed messages by routing them to a designated endpoint, preventing route blockages. This article explores DLC configuration using a lightweight log endpoint—no JMS broker required—ideal for development, testing, or simple logging setups. You'll get a complete, runnable example to see it in action.

What is Dead Letter Channel?

The DLC activates when a message processing fails after exhausting retries. Unlike Camel's default error handler, which logs exceptions, DLC preserves the original input message and forwards it to a "dead letter" endpoint like a log, file, or queue. Key benefits include message isolation for later analysis and non-blocking route behavior.

It supports redelivery policies for automatic retries before final routing. This pattern shines in enterprise integration where failures (e.g., network issues, data validation errors) must not halt overall flow.

Why Use Log Endpoint?

The log component offers zero-setup error handling: it captures failures at ERROR level with full exchange details (body, headers, exception stack trace). Perfect for:

  • Quick debugging without external dependencies.
  • Console output in standalone apps or containers.
  • Custom formatting via options like showAll=true or multiline=true.

No database, file system, or broker needed—run it anywhere Camel supports logging (SLF4J/Logback by default).

Complete Working Example

This standalone Java app uses Camel Main to simulate failures via a timer-triggered route. It retries twice, then logs the failure.

Java DSL Code

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.main.Main;

public class DeadLetterLogDemo extends RouteBuilder {
    @Override
    public void configure() {
        // Configure DLC with log endpoint
        errorHandler(deadLetterChannel("log:dead?level=ERROR&showAll=true&multiline=true")
            .useOriginalMessage()  // Preserves original input
            .maximumRedeliveries(2)
            .redeliveryDelay(1000)
            .retryAttemptedLogLevel(org.apache.camel.LoggingLevel.WARN));

        // Route that simulates failure
        from("timer:fail?period=5000&repeatCount=1")
            .log("🔥 Starting processing: ${body}")
            .process(exchange -> {
                // Simulate a runtime failure (e.g., bad data parsing)
                throw new RuntimeException("💥 Simulated processing error!");
            })
            .log("✅ Success log (won't reach here)");
    }

    public static void main(String[] args) throws Exception {
        Main main = new Main();
        main.configure().addRoutesBuilder(new DeadLetterLogDemo());
        main.run(args);  // Runs until Ctrl+C
    }
}

Expected Console Output

🔥 Starting processing:
Failed delivery for (MessageId: 25B7DFF9FB26DD8-0000000000000000 on ExchangeId: 25B7DFF9FB26DD8-0000000000000000). On delivery attempt: 0 caught: java.lang.RuntimeException: 💥 Simulated processing error!
Failed delivery for (MessageId: 25B7DFF9FB26DD8-0000000000000000 on ExchangeId: 25B7DFF9FB26DD8-0000000000000000). On delivery attempt: 1 caught: java.lang.RuntimeException: 💥 Simulated processing error!
Failed delivery for (MessageId: 25B7DFF9FB26DD8-0000000000000000 on ExchangeId: 25B7DFF9FB26DD8-0000000000000000). On delivery attempt: 2 caught: java.lang.RuntimeException: 💥 Simulated processing error!
Exchange[
  Id: 25B7DFF9FB26DD8-0000000000000000
  RouteGroup: null
  RouteId: route1
  ExchangePattern: InOnly
  Properties: {CamelExceptionCaught=java.lang.RuntimeException: 💥 Simulated processing error!, CamelFailureRouteId=route1, CamelFatalFallbackErrorHandler=[route1], CamelToEndpoint=log://dead?level=ERROR&multiline=true&showAll=true}

Advanced Customizations

Enhance with processors for richer logging:

errorHandler(deadLetterChannel("log:dead?level=ERROR")
    .onExceptionOccurred(exchange -> {
        exchange.getIn().setHeader("failureTime", System.currentTimeMillis());
        exchange.getIn().setHeader("errorDetails", exchange.getException().getMessage());
    }));

Best Practices

  • Always pair with useOriginalMessage() to avoid losing input data.
  • Set maximumRedeliveries=0 for immediate DLC without retries in tests.
  • Monitor log volume in production; consider rotating to file or alerting.
  • Test edge cases: network timeouts, serialization errors, poison messages.
  • Global DLC via camelContext.setErrorHandlerFactory() for app-wide coverage.

This setup provides robust, observable error handling. Experiment by changing the failure to a real processor (e.g., invalid JSON parsing) for your use cases.

Apache Camel Transformer

Apache Camel's Transformer EIP enables declarative, type-safe message conversion between POJOs in routes. Below is a fully self-contained, single-file example using nested classes and Camel's Main class.

Complete Single-File Example

import org.apache.camel.Message;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.main.Main;
import org.apache.camel.spi.DataType;
import org.apache.camel.spi.Transformer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Apache Camel Transformer EIP example demonstrating declarative type-safe message conversion.
 * All classes (Lead, Customer, LeadToCustomerTransformer) are in this single file.
 */
public class TransformerDemo {
    private static final Logger LOG = LoggerFactory.getLogger(TransformerDemo.class);

    /**
     * Lead POJO - Input model
     */
    public static class Lead {
        private String name;
        private String company;
        private String city;

        public Lead() {}

        public Lead(String name, String company, String city) {
            this.name = name;
            this.company = company;
            this.city = city;
        }

        // Getters and setters
        public String getName() { return name; }
        public void setName(String name) { this.name = name; }
        public String getCompany() { return company; }
        public void setCompany(String company) { this.company = company; }
        public String getCity() { return city; }
        public void setCity(String city) { this.city = city; }

        @Override
        public String toString() {
            return "Lead{name='" + name + "', company='" + company + "', city='" + city + "'}";
        }
    }

    /**
     * Customer POJO - Output model
     */
    public static class Customer {
        private String fullName;
        private String organization;
        private String location;

        public Customer() {}

        public Customer(String fullName, String organization, String location) {
            this.fullName = fullName;
            this.organization = organization;
            this.location = location;
        }

        // Getters and setters
        public String getFullName() { return fullName; }
        public void setFullName(String fullName) { this.fullName = fullName; }
        public String getOrganization() { return organization; }
        public void setOrganization(String organization) { this.organization = organization; }
        public String getLocation() { return location; }
        public void setLocation(String location) { this.location = location; }

        @Override
        public String toString() {
            return "Customer{fullName='" + fullName + "', organization='" + organization + "', location='" + location + "'}";
        }
    }

    /**
     * Transformer implementation for converting Lead to Customer
     */
    public static class LeadToCustomerTransformer extends Transformer {
        @Override
        public void transform(Message message, DataType fromType, DataType toType) throws Exception {
            Lead lead = message.getMandatoryBody(Lead.class);
            Customer customer = new Customer(
                lead.getName().toUpperCase(),
                lead.getCompany(),
                lead.getCity()
            );
            message.setBody(customer);
        }
    }

    public static void main(String[] args) throws Exception {
        Main main = new Main();
        main.configure().addRoutesBuilder(new RouteBuilder() {
            @Override
            public void configure() {
                // Register the transformer
                transformer()
                    .fromType("java:TransformerDemo$Lead")
                    .toType("java:TransformerDemo$Customer")
                    .withJava(LeadToCustomerTransformer.class);

                // Main route with automatic transformation
                from("direct:start")
                    .inputType("java:TransformerDemo$Lead")
                    .log("INPUT Lead: ${body}")
                    .to("direct:process")  // Triggers transformer automatically
                    .log("OUTPUT Customer: ${body}");

                from("direct:process")
                    .inputType("java:TransformerDemo$Customer")
                    .log("PROCESSED Customer: ${body}")
                    .to("log:final?showAll=true");
            }
        });

        // Add a startup listener to send test message after Camel starts
        main.addMainListener(new org.apache.camel.main.MainListenerSupport() {
            @Override
            public void afterStart(org.apache.camel.main.BaseMainSupport mainSupport) {
                try {
                    LOG.info("Sending test Lead message...");
                    mainSupport.getCamelContext().createProducerTemplate()
                        .sendBody("direct:start", new Lead("John Doe", "Acme Corp", "Auckland"));
                    LOG.info("Test message sent successfully!");
                } catch (Exception e) {
                    LOG.error("Failed to send test message", e);
                }
            }
        });

        // Run Camel (will run until Ctrl+C is pressed)
        main.run(args);
    }
}

Expected Console Output

Sending test Lead message...
INPUT Lead: Lead{name='John Doe', company='Acme Corp', city='Auckland'}
PROCESSED Customer: Customer{fullName='JOHN DOE', organization='Acme Corp', location='Auckland'}
Exchange[Id: D0B1A9A95984A67-0000000000000000, RouteGroup: null, RouteId: route2, ExchangePattern: InOnly, Properties: {CamelToEndpoint=log://final?showAll=true}, Headers: {}, BodyType: TransformerDemo.Customer, Body: Customer{fullName='JOHN DOE', organization='Acme Corp', location='Auckland'}]
OUTPUT Customer: Customer{fullName='JOHN DOE', organization='Acme Corp', location='Auckland'}
Test message sent successfully!

Key Implementation Details

Nested Class References: Uses TransformerDemo$Lead syntax to reference nested classes in data type strings.

Transformer Method Signature: Uses transform(Message message, DataType fromType, DataType toType)—the modern Camel 4.x+ API.

Automatic Startup Test: MainListenerSupport.afterStart() sends test message automatically when Camel context starts.

Single-File Design: All POJOs, transformer, and routes contained in one compilable .java file.

How the Transformation Triggers

  1. Input validationinputType("java:TransformerDemo$Lead") expects Lead
  2. Route boundaryto("direct:process") requires Customer input type
  3. Type mismatch → Camel matches Lead→Customer transformer automatically
  4. Conversion executesLeadToCustomerTransformer.transform() runs
  5. Type contract satisfied → Route continues with Customer

Apache Camel Consumer Types Explained

Apache Camel consumers initiate message processing in routes via the from() endpoint. They fall into two primary categories: event-driven and polling.

Core Classifications

Camel officially recognizes two consumer types—event-driven and polling—each suited to different message sources.

Type Description Mechanism
Event-Driven Reacts immediately to incoming events or invocations. No periodic checks. External push (e.g., JMS) or internal calls (e.g., direct). Uses transport-provided threads.
Polling Camel actively queries the endpoint at set intervals. Scheduled checks via ScheduledExecutorService. Configurable delay/initialDelay.

Event-Driven Consumers

These handle real-time messages without polling overhead. Examples include JMS (queue/topic events), HTTP/Netty (request triggers), and in-VM options like direct/SEDA.

  • Direct: Synchronous, blocking calls within the same CamelContext—like method invocation.
  • SEDA: Asynchronous queuing with backpressure control (bounded queue).

Example:

from("direct:start")  // Event-driven: waits for producerTemplate.sendBody()
    .log("Processed: ${body}");

Polling Consumers

Ideal for batch sources like files or databases. Camel polls periodically, ensuring ordered processing.

  • Uses fixed delays; supports repeat/repeatAttempt options.
  • Examples: file (watches directories), FTP/IMAP (remote checks).

Example:

from("file:input?delay=5000&noop=true")  // Polls every 5s
    .to("log:output");

Special Cases: In-VM Consumers

Direct and SEDA are event-driven subtypes for route chaining:

  • Direct: Sync, single-context.
  • SEDA: Async queue.

They differ from transport-based event-driven (e.g., JMS) by lacking external brokers but share passive reception.

When to Use Each

  • Event-Driven: Low-latency, continuous streams (pub/sub, APIs).
  • Polling: Reliable pulls from passive sources (files, legacy systems).

This covers Camel's consumer landscape for effective route design.

Apache Camel SEDA

Apache Camel SEDA implements the Staged Event-Driven Architecture pattern, enabling in-VM asynchronous messaging that decouples producers from consumers via BlockingQueue. This excels in high-load scenarios where synchronous endpoints like Direct would block threads—SEDA queues messages instead, boosting scalability with configurable concurrent consumers.

Core Advantages

  • Non-blocking Producers: Senders complete instantly while slow consumers process from the queue, preventing cascade failures.
  • Thread Pool Efficiency: Multiple consumers (concurrentConsumers=3) parallelize work without manual thread management.
  • Configurable Resilience: Options like queueSize, discardWhenFull, and offerTimeout handle overload gracefully.

Example

This standalone app uses Camel Main (Camel 4.x) with a custom ExchangeFormatter to visualize thread names, exchange IDs, and route context—clearly demonstrating SEDA's parallel consumer threads. Producers fire 5 messages rapidly (100ms intervals) into SEDA, while consumers lag with 1s delays; logs reveal immediate sends followed by staggered, multi-threaded processing.

import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.spi.ExchangeFormatter;

/**
 * Apache Camel example demonstrating SEDA (Staged Event-Driven Architecture) pattern.
 * Shows asynchronous message processing with concurrent consumers and queuing.
 */
public class SedaExample {

    static void main(String[] args) throws Exception {
        // Create a new Camel Main instance (using fully qualified name to avoid conflict)
        org.apache.camel.main.Main main = new org.apache.camel.main.Main();

        // Add routes with SEDA processing
        main.configure().addRoutesBuilder(new RouteBuilder() {
            @Override
            public void configure() {
                // Create a custom ExchangeFormatter for detailed log output
                ExchangeFormatter customFormatter = exchange ->
                        String.format("[Thread: %s] Body: %s | ExchangeId: %s | RouteId: %s",
                                Thread.currentThread().getName(),
                                exchange.getIn().getBody(String.class),
                                exchange.getExchangeId(),
                                exchange.getFromRouteId());

                // Register the custom formatter in the Camel registry
                getContext().getRegistry().bind("customFormatter", customFormatter);

                // SEDA endpoint: queueId=myQueue, concurrent consumers for parallelism
                from("seda:myQueue?concurrentConsumers=3")
                    .log("Processing: ${body}")
                    .delay(1000)  // Simulate slow consumer (1s delay)
                    .to("log:output?showAll=true&exchangeFormatter=#customFormatter");

                // Producer route for demo
                from("timer:tick?repeatCount=5&delay=100")  // Fire 5 msgs quickly
                    .setBody().simple("Msg ${exchangeId}")
                    .log("Sending: ${body}")
                    .to("seda:myQueue");
            }
        });

        // Run Camel (will run until Ctrl+C is pressed)
        main.run(args);
    }
}

Running and Verification

Compile and run the Java class directly (requires Camel 4.14.0 on classpath). Sample output shows the advantage:

Sending: Msg 1CB44DE50955685-0000000000000000     // Producer thread - instant
Sending: Msg 1CB44DE50955685-0000000000000001     // Producer continues rapidly
output - [Thread: Camel (camel-1) thread #6 - Delay] Body: Msg 1CB44DE50955685-0000000000000000 | ExchangeId: 1CB44DE50955685-0000000000000002 | RouteId: route1
output - [Thread: Camel (camel-1) thread #7 - Delay] Body: Msg 1CB44DE50955685-0000000000000001 | ExchangeId: 1CB44DE50955685-0000000000000004 | RouteId: route1  // Parallel consumers

Contrast with direct:myQueue: sends block behind delays on single thread. SEDA's queue absorbs bursts across threads, perfect for enterprise workloads like order processing.

« Older posts