Amazon Web ServicesCloud

AWS SNS SQS Large Message Tips

SNS 对于大于当前最大值 256KB(最大为 2GB)的消息,可以通过Amazon SNS 扩展库来实现,具体的流程如下:

事先准备


  • Pom maven依赖包
<dependency>
    <groupId>software.amazon.sns</groupId>
    <artifactId>sns-extended-client</artifactId>
    <version>1.1.2</version>
</dependency>
		
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>aws-java-sdk-sns</artifactId>
    <version>1.12.362</version>
</dependency>
  • aws上创建 Amazon SQS队列
  • aws上创建SNS 主题,并且订阅上面的队列。

  • aws上创建S3 Bucket
    • Java端 往SNS发消息的代码
    package com.jtb.nucleus.aws.service;
    
    import com.amazon.sqs.javamessaging.AmazonSQSExtendedClient;
    import com.amazon.sqs.javamessaging.ExtendedClientConfiguration;
    import com.amazonaws.regions.Regions;
    import com.amazonaws.services.s3.AmazonS3;
    import com.amazonaws.services.s3.AmazonS3ClientBuilder;
    import com.amazonaws.services.sns.AmazonSNS;
    import com.amazonaws.services.sns.AmazonSNSClientBuilder;
    import com.amazonaws.services.sqs.AmazonSQS;
    import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
    import com.amazonaws.services.sqs.model.ReceiveMessageResult;
    
    import software.amazon.sns.AmazonSNSExtendedClient;
    import software.amazon.sns.SNSExtendedClientConfiguration;
    
    public class Sns2SqsExample {
    
    	public static void main(String[] args) {
    		final String BUCKET_NAME = "laragemsg";
    		final Regions region = Regions.AP_NORTHEAST_1;
    
    		// Message threshold controls the maximum message size that will be allowed to
    		// be published
    		// through SNS using the extended client. Payload of messages exceeding this
    		// value will be stored in
    		// S3. The default value of this parameter is 256 KB which is the maximum
    		// message size in SNS (and SQS).
    		final int EXTENDED_STORAGE_MESSAGE_SIZE_THRESHOLD = 32;
    
    		// Initialize SNS, SQS and S3 clients
    		final AmazonSNS snsClient = AmazonSNSClientBuilder.standard().withRegion(region).build();
    		final AmazonSQS sqsClient = AmazonSQSClientBuilder.standard().withRegion(region).build();
    		final AmazonS3 s3Client = AmazonS3ClientBuilder.standard().withRegion(region).build();
    
    		var topicsList = snsClient.listTopics();
    		var queueList = sqsClient.listQueues();
    
    
    		// Initialize SNS extended client
    		// PayloadSizeThreshold triggers message content storage in S3 when the
    		// threshold is exceeded
    		// To store all messages content in S3, use AlwaysThroughS3 flag
    		final SNSExtendedClientConfiguration snsExtendedClientConfiguration = new SNSExtendedClientConfiguration()
    				.withPayloadSupportEnabled(s3Client, BUCKET_NAME)
    				.withPayloadSizeThreshold(EXTENDED_STORAGE_MESSAGE_SIZE_THRESHOLD);
    		final AmazonSNSExtendedClient snsExtendedClient = new AmazonSNSExtendedClient(snsClient,
    				snsExtendedClientConfiguration);
    
    		var topicArn = topicsList.getTopics().get(0).getTopicArn();
    		// Publish message via SNS with storage in S3
    		final String message = "This message is stored in S3 as it exceeds the threshold of 32 bytes set above.";
    		snsExtendedClient.publish(topicArn, message);
    
    		// Initialize SQS extended client
    		final ExtendedClientConfiguration sqsExtendedClientConfiguration = new ExtendedClientConfiguration()
    				.withPayloadSupportEnabled(s3Client, BUCKET_NAME);
    		final AmazonSQSExtendedClient sqsExtendedClient = new AmazonSQSExtendedClient(sqsClient,
    				sqsExtendedClientConfiguration);
    
    		// Read the message from the queue
    		for (String url : queueList.getQueueUrls()) {
    			final ReceiveMessageResult result = sqsExtendedClient.receiveMessage(url);
    			if (result.getMessages().size() > 0) {
    				System.out.println("Received message is " + result.getMessages().get(0).getBody());
    			}
    		}
    	}
    }
    
    • 在Java代码运行的机器上,通过aws configure设置好 aws账户的 access_id 和 access_key 和用户权限。

    测试:


    执行Java代码以后,消息正常发送到SNS服务,但是会发现SNS把消息作为文件上传到S3 Bucket中了。

    SNS收到消息以后,会把消息转发到SQS队列,我们看一下SQS队列的消息。

    可以看到SQS消息的正文,其实没有消息的本体,而是消息在S3存储的Key,通过在key,我们可以从S3中把消息取回来。

    附:大消息 SNS->SQS_Lambda 流程说明


    • ②:SNS判断是LargeMessage的话, 会把消息本体存到S3 Bucket中。
    • ③:SNS会收到S3文件的Key的信息
    • ④:SNS会把S3文件的Key的信息,转送到SQS队列
    • ⑤:SQS队列,会触发Lambda,Lambda从队列中取得情报
    • ⑥:Lambda可以根据S3文件的Key信息,从S3中把消息本体文件取出来,做下一步处理。

    官方Guide:https://docs.aws.amazon.com/sns/latest/dg/large-message-payloads.html

    代码例子:https://github.com/awslabs/amazon-sns-java-extended-client-lib/