SNS 对于大于当前最大值 256KB(最大为 2GB)的消息,可以通过Amazon SNS 扩展库来实现,具体的流程如下:
事先准备
- Pom maven依赖包
<dependency>
<groupId>software.amazon.sns</groupId>
<artifactId>sns-extended-client</artifactId>
<version>1.1.2</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-sns</artifactId>
<version>1.12.362</version>
</dependency>
- aws上创建 Amazon SQS队列

- aws上创建SNS 主题,并且订阅上面的队列。


- Java端 往SNS发消息的代码
package com.jtb.nucleus.aws.service;
import com.amazon.sqs.javamessaging.AmazonSQSExtendedClient;
import com.amazon.sqs.javamessaging.ExtendedClientConfiguration;
import com.amazonaws.regions.Regions;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3ClientBuilder;
import com.amazonaws.services.sns.AmazonSNS;
import com.amazonaws.services.sns.AmazonSNSClientBuilder;
import com.amazonaws.services.sqs.AmazonSQS;
import com.amazonaws.services.sqs.AmazonSQSClientBuilder;
import com.amazonaws.services.sqs.model.ReceiveMessageResult;
import software.amazon.sns.AmazonSNSExtendedClient;
import software.amazon.sns.SNSExtendedClientConfiguration;
public class Sns2SqsExample {
public static void main(String[] args) {
final String BUCKET_NAME = "laragemsg";
final Regions region = Regions.AP_NORTHEAST_1;
// Message threshold controls the maximum message size that will be allowed to
// be published
// through SNS using the extended client. Payload of messages exceeding this
// value will be stored in
// S3. The default value of this parameter is 256 KB which is the maximum
// message size in SNS (and SQS).
final int EXTENDED_STORAGE_MESSAGE_SIZE_THRESHOLD = 32;
// Initialize SNS, SQS and S3 clients
final AmazonSNS snsClient = AmazonSNSClientBuilder.standard().withRegion(region).build();
final AmazonSQS sqsClient = AmazonSQSClientBuilder.standard().withRegion(region).build();
final AmazonS3 s3Client = AmazonS3ClientBuilder.standard().withRegion(region).build();
var topicsList = snsClient.listTopics();
var queueList = sqsClient.listQueues();
// Initialize SNS extended client
// PayloadSizeThreshold triggers message content storage in S3 when the
// threshold is exceeded
// To store all messages content in S3, use AlwaysThroughS3 flag
final SNSExtendedClientConfiguration snsExtendedClientConfiguration = new SNSExtendedClientConfiguration()
.withPayloadSupportEnabled(s3Client, BUCKET_NAME)
.withPayloadSizeThreshold(EXTENDED_STORAGE_MESSAGE_SIZE_THRESHOLD);
final AmazonSNSExtendedClient snsExtendedClient = new AmazonSNSExtendedClient(snsClient,
snsExtendedClientConfiguration);
var topicArn = topicsList.getTopics().get(0).getTopicArn();
// Publish message via SNS with storage in S3
final String message = "This message is stored in S3 as it exceeds the threshold of 32 bytes set above.";
snsExtendedClient.publish(topicArn, message);
// Initialize SQS extended client
final ExtendedClientConfiguration sqsExtendedClientConfiguration = new ExtendedClientConfiguration()
.withPayloadSupportEnabled(s3Client, BUCKET_NAME);
final AmazonSQSExtendedClient sqsExtendedClient = new AmazonSQSExtendedClient(sqsClient,
sqsExtendedClientConfiguration);
// Read the message from the queue
for (String url : queueList.getQueueUrls()) {
final ReceiveMessageResult result = sqsExtendedClient.receiveMessage(url);
if (result.getMessages().size() > 0) {
System.out.println("Received message is " + result.getMessages().get(0).getBody());
}
}
}
}
- 在Java代码运行的机器上,通过aws configure设置好 aws账户的 access_id 和 access_key 和用户权限。





测试:
执行Java代码以后,消息正常发送到SNS服务,但是会发现SNS把消息作为文件上传到S3 Bucket中了。

SNS收到消息以后,会把消息转发到SQS队列,我们看一下SQS队列的消息。


可以看到SQS消息的正文,其实没有消息的本体,而是消息在S3存储的Key,通过在key,我们可以从S3中把消息取回来。
附:大消息 SNS->SQS_Lambda 流程说明

- ①:Java端通过「The Amazon SNS Extended Client Library for Java 」向SNS服务发送消息
- ②:SNS判断是LargeMessage的话, 会把消息本体存到S3 Bucket中。
- ③:SNS会收到S3文件的Key的信息
- ④:SNS会把S3文件的Key的信息,转送到SQS队列
- ⑤:SQS队列,会触发Lambda,Lambda从队列中取得情报
- ⑥:Lambda可以根据S3文件的Key信息,从S3中把消息本体文件取出来,做下一步处理。
官方Guide:https://docs.aws.amazon.com/sns/latest/dg/large-message-payloads.html
代码例子:https://github.com/awslabs/amazon-sns-java-extended-client-lib/