If you ever worked on an application, which uses Cloud services, you probably know it’s a struggle to develop, debug or test locally without the access to the service itself. Fortunately for the last years, it became more approachable as many cloud services matured enough to gain proprietary emulators published by the cloud providers.

Recently, I’ve joined a project, where GCP Pub/Sub is used as the main messaging system for asynchronous communication. Software, responsible for Pub/Sub connectivity is Spring Cloud Stream GCP with already stated as legacy annotation-based approach. One of the convention established was to disable Pub/Sub support for local development and to mock Sink and Source in the tests. That’s fine unless:

  • you need to use Pub/Sub to trigger your application flow

  • you’re good using mocks in all kind of tests

I’m not a big fan of mocks, especially when it comes to integration testing, where I usually treat them as a last resort. Therefore, after some research I was able to come up with fairly simple solution, which allowed me to work locally with GCP Pub Sub with ease:

GCP Pub/Sub Emulator

GCP Pub/Sub Emulator plays the key part in the solution. It emulates production (cloud) version of Pub / Sub services with certain limitations:

  • UpdateTopic and UpdateSnapshot RPCs are not supported.

  • IAM operations are not supported.

  • Configurable message retention is not supported; all messages are retained indefinitely.

  • Retry policies are not supported.

  • Subscription expiration is not supported. Subscriptions do not expire.

  • Filtering is not supported.

Retry policies not being supported are the most problematic, because in case of an exception we would get stuck in an infinite redelivery loop (for the Pub/Sub subscriber). We’ll tackle that later on.

All the basics (installation, running, etc.) are pretty well explained in the official documentation. I’d like to focus on the ease of use aspect.

Docker is your friend

I think it’s no surprise, that Docker is mentioned here. With Docker, you can abstract away all, less important details (dependencies, installation process) of the service you want to run. Hence, it comes in very handy in regard to testing (with great Testcontainers library) or local development.

I have created Pub/Sub Emulator docker image which is available at the Docker Hub. If you’d like to build the image yourself, source code is available at Github. To run the Pub/Sub Emulator with sensible defaults (where local Project ID is the most important) run Docker command as follows:
docker run -d -p 8085:8085 poznachowski/gcp-pubsub-emulator:latest
If more dependencies are required to set up the local environment, it makes sense to wrap them into a single docker-compose file:

docker-compose.yml
1
2
3
4
5
services:
  pubsub_emulator:
    image: poznachowski/gcp-pubsub-emulator:latest
    ports:
    - 8085:8085

Spring Boot / Cloud configuration

Now, we need to tell Spring to use the emulated version of the Pub/Sub. To do this, we will make use of an additional local profile with following configuration:

application-local.yml
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
spring:
  cloud:
    gcp:
      pubsub:
        enabled: true
        emulator-host: localhost:8085 (1)
        project-id: local (2)
    stream:
      gcp:
        pubsub:
          default:
            consumer:
              ack-mode: AUTO_ACK (3)
              auto-create-resources: true (4)
            producer:
              auto-create-resources: true (4)
1 Address of the deployed Pub/Sub Emulator
2 GCP Project ID used in the application, we need to provide the same value as in the Emulator
3 We need to change mode to AUTO_ACK to disable automatic nack.
4 We need to make sure that Spring Cloud creates required Pub/Sub topology in the Emulator

Setting up error handler

In the configuration above we set ack mode to AUTO_ACK. Otherwise, in case of any exception being thrown, Pub/Sub Emulator would redeliver infinitely. Now, there is no action taken on exception, which means we still fall under ack deadline - and when it’s met Pub/Sub will still try to redeliver. To make sure that a message is delivered only once - regardless of the processing result, we need to set up an error handler. It will handle the error message by simply logging it out. This means it completes successfully, and it will be ack’ed automatically by the framework.

For a single subscription it is as easy as adding a @ServiceActivator to an error channel:

1
2
3
4
5
6
7
8
// Note that the error `inputChannel` is formatted as [Pub/Sub subscription name].errors
// or the equivalent of [Pub/Sub topic name].[group name].errors. If you change the topic name in
// `application.properties`, you will also have to change the `inputChannel` below.
@ServiceActivator(inputChannel = "my-topic.my-group.errors")
public void error(Message<MessagingException> message) {
    LOGGER.error("The message that was sent is now processed by the error handler.");
    LOGGER.error("Failed message: " + message.getPayload().getFailedMessage());
}

For multiple subscriptions it would become maintenance hell pretty quickly. Fortunately, a better alternative exists. With Spring Cloud Stream GCP implementation 1.x however - it comes with a catch. We need to pull out the default PubSubBinderConfiguration and provide a slightly modified version:

Spring Cloud GCP 1.x PubSubLocalConfiguration.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Slf4j
@Configuration
@Profile("local")
@EnableConfigurationProperties(PubSubExtendedBindingProperties.class)
public class PubSubLocalConfiguration {

    @Bean
    public PubSubChannelProvisioner pubSubChannelProvisioner(PubSubAdmin pubSubAdmin) {
        return new PubSubChannelProvisioner(pubSubAdmin);
    }

    @Bean
    public PubSubMessageChannelBinder pubSubBinder(
            PubSubChannelProvisioner pubSubChannelProvisioner,
            PubSubTemplate pubSubTemplate,
            PubSubExtendedBindingProperties pubSubExtendedBindingProperties) {

        var binder = new PubSubMessageChannelBinder(null,
                pubSubChannelProvisioner,
                pubSubTemplate,
                pubSubExtendedBindingProperties
        );
        binder.setConsumerEndpointCustomizer((endpoint, destinationName, group) -> { (1)
            var channelAdapter = (PubSubInboundChannelAdapter) endpoint;
            var channel = new PublishSubscribeChannel();
            channel.subscribe(m -> log.error("Error handled: ", ((ErrorMessage)m).getPayload()));
            channelAdapter.setErrorChannel(channel);
        });
        return binder;
    }

    @Bean
    public BindingHandlerAdvise.MappingsProvider pubSubExtendedPropertiesDefaultMappingsProvider() {
        return () -> Collections.singletonMap(
                ConfigurationPropertyName.of("spring.cloud.stream.gcp.pubsub.bindings"),
                ConfigurationPropertyName.of("spring.cloud.stream.gcp.pubsub.default"));
    }
}
1 Main difference between the original configuration. Here we provide ConsumerEndpointCustomizer that declares a dedicated error channel implementation to simply log it out and let it finish without any exception being thrown.

Since Spring Cloud Stream GCP 2.0.0 you are able to provide sole ConsumerEndpointCustomizer bean to customize the Pub/Sub binder. Hence, the solution would trim to:

Spring Cloud GCP 2.x PubSubLocalConfiguration.java
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
@Slf4j
@Configuration
@Profile("local")
public class PubSubLocalConfiguration {

    @Bean
    public ConsumerEndpointCustomizer<PubSubInboundChannelAdapter> consumerCustomizer() {
        return (endpoint, destinationName, group) -> {
            var channel = new PublishSubscribeChannel();
            channel.subscribe(m -> log.error("Error handled: ", ((ErrorMessage)m).getPayload()));
            endpoint.setErrorChannel(channel);
        };
    }

Running everything together

  1. Run the Pub/Sub docker image

  2. Run the application with local profile

  3. Trigger Pub/Sub entry points with Cloud Pub/Sub API:

    1. After the application starts, you can check if topics have been created by calling [GET]http://localhost:8085/v1/projects/local/topics

    2. To publish a message to a particular topic call [POST]http://localhost:8085/v1/projects/local/topics/topic_name:publish

Let’s assume that we want to send:

{
  "projectId": "3454DF"
}

payload to projects.delete topic. The Pub/Sub message contract states we need to Base64 encode our payload: ewogICJwcm9qZWN0SWQiOiAiMzQ1NERGIgp9, hence we should send:

{
  "messages":[
    {
      "attributes":{
      },
      "data":"ewogICJwcm9qZWN0SWQiOiAiMzQ1NERGIgp9"
    }
  ]
}

to [POST] http://localhost:8085/v1/projects/local/topics/projects.delete:publish.

At this point, we should see that the HTTP call triggered your application flow. What is more, in case of any exception thrown - the Pub/Sub Emulator won’t redeliver it.

Unfortunately, Pub/Sub Rest API does not support message retrieval (from a topic). To verify if the message was sent to a given topic I would suggest creating a dedicated subscriber, whether in the local configuration or in the integration test, and log or assert the incoming message(s).