SNS 对于大于当前最大值 256KB(最大为 2GB)的消息,可以通过Amazon SNS 扩展库来实现,具体的流程如下:
事先准备
- Pom maven依赖包
<dependency> <groupId>software.amazon.sns</groupId> <artifactId>sns-extended-client</artifactId> <version>1.1.2</version> </dependency> <dependency> <groupId>com.amazonaws</groupId> <artifactId>aws-java-sdk-sns</artifactId> <version>1.12.362</version> </dependency>
- aws上创建 Amazon SQS队列
- aws上创建SNS 主题,并且订阅上面的队列。
- Java端 往SNS发消息的代码
package com.jtb.nucleus.aws.service; import com.amazon.sqs.javamessaging.AmazonSQSExtendedClient; import com.amazon.sqs.javamessaging.ExtendedClientConfiguration; import com.amazonaws.regions.Regions; import com.amazonaws.services.s3.AmazonS3; import com.amazonaws.services.s3.AmazonS3ClientBuilder; import com.amazonaws.services.sns.AmazonSNS; import com.amazonaws.services.sns.AmazonSNSClientBuilder; import com.amazonaws.services.sqs.AmazonSQS; import com.amazonaws.services.sqs.AmazonSQSClientBuilder; import com.amazonaws.services.sqs.model.ReceiveMessageResult; import software.amazon.sns.AmazonSNSExtendedClient; import software.amazon.sns.SNSExtendedClientConfiguration; public class Sns2SqsExample { public static void main(String[] args) { final String BUCKET_NAME = "laragemsg"; final Regions region = Regions.AP_NORTHEAST_1; // Message threshold controls the maximum message size that will be allowed to // be published // through SNS using the extended client. Payload of messages exceeding this // value will be stored in // S3. The default value of this parameter is 256 KB which is the maximum // message size in SNS (and SQS). final int EXTENDED_STORAGE_MESSAGE_SIZE_THRESHOLD = 32; // Initialize SNS, SQS and S3 clients final AmazonSNS snsClient = AmazonSNSClientBuilder.standard().withRegion(region).build(); final AmazonSQS sqsClient = AmazonSQSClientBuilder.standard().withRegion(region).build(); final AmazonS3 s3Client = AmazonS3ClientBuilder.standard().withRegion(region).build(); var topicsList = snsClient.listTopics(); var queueList = sqsClient.listQueues(); // Initialize SNS extended client // PayloadSizeThreshold triggers message content storage in S3 when the // threshold is exceeded // To store all messages content in S3, use AlwaysThroughS3 flag final SNSExtendedClientConfiguration snsExtendedClientConfiguration = new SNSExtendedClientConfiguration() .withPayloadSupportEnabled(s3Client, BUCKET_NAME) .withPayloadSizeThreshold(EXTENDED_STORAGE_MESSAGE_SIZE_THRESHOLD); final AmazonSNSExtendedClient snsExtendedClient = new AmazonSNSExtendedClient(snsClient, snsExtendedClientConfiguration); var topicArn = topicsList.getTopics().get(0).getTopicArn(); // Publish message via SNS with storage in S3 final String message = "This message is stored in S3 as it exceeds the threshold of 32 bytes set above."; snsExtendedClient.publish(topicArn, message); // Initialize SQS extended client final ExtendedClientConfiguration sqsExtendedClientConfiguration = new ExtendedClientConfiguration() .withPayloadSupportEnabled(s3Client, BUCKET_NAME); final AmazonSQSExtendedClient sqsExtendedClient = new AmazonSQSExtendedClient(sqsClient, sqsExtendedClientConfiguration); // Read the message from the queue for (String url : queueList.getQueueUrls()) { final ReceiveMessageResult result = sqsExtendedClient.receiveMessage(url); if (result.getMessages().size() > 0) { System.out.println("Received message is " + result.getMessages().get(0).getBody()); } } } }
- 在Java代码运行的机器上,通过aws configure设置好 aws账户的 access_id 和 access_key 和用户权限。
测试:
执行Java代码以后,消息正常发送到SNS服务,但是会发现SNS把消息作为文件上传到S3 Bucket中了。
SNS收到消息以后,会把消息转发到SQS队列,我们看一下SQS队列的消息。
可以看到SQS消息的正文,其实没有消息的本体,而是消息在S3存储的Key,通过在key,我们可以从S3中把消息取回来。
附:大消息 SNS->SQS_Lambda 流程说明
- ①:Java端通过「The Amazon SNS Extended Client Library for Java 」向SNS服务发送消息
- ②:SNS判断是LargeMessage的话, 会把消息本体存到S3 Bucket中。
- ③:SNS会收到S3文件的Key的信息
- ④:SNS会把S3文件的Key的信息,转送到SQS队列
- ⑤:SQS队列,会触发Lambda,Lambda从队列中取得情报
- ⑥:Lambda可以根据S3文件的Key信息,从S3中把消息本体文件取出来,做下一步处理。
官方Guide:https://docs.aws.amazon.com/sns/latest/dg/large-message-payloads.html
代码例子:https://github.com/awslabs/amazon-sns-java-extended-client-lib/