京东 JCQ 消息服务订阅

正常订阅流程

1、正常情况下。

  • 登录 京东-商家开放控制台
  • 选择应用 -> 消息服务 -> 订阅消息 -> 输入主题名称 -> 点击查询 -> 选择订阅即可
  • 输入信息(云账号,key ID,key Secret,选择区域),确认授权订阅即可。
    1
    2
    云账号:参考 <https://uc.jdcloud.com/account/basic-info> 【账户名】
    key ID/key Secret:参考 <https://uc.jdcloud.com/account/accesskey> 【没有的话,就创建】

其中的坑

2、但是其中有坑。

跨区域问题

  • 若你的云主机是上海的,就不能订阅区域为北京的 topic 主题,只能订阅上海的 topic 主题(我就是这个问题)
    1
    2
    3
    4
    5
    6
    7
    怎么查看你的主机所在区域:参考 <https://yd-console.jdcloud.com/#/vm/host/list>
    【基础云服务 -> 云主机 -> 实例】
    这里可以选择四个,看看你的主机在哪个里面,就是你主机所在的区域:
    1、华北-北京
    2、华南-广州
    3、华东-宿迁
    4、华东-上海

postman请求订阅

  • 根据上面可以了解到,你是上海的,订阅的时候只能选择北京,那样是不可以的。因为我咨询客服了,他说跨区域是不能通信的。只能通过 curl 发送请求来订阅,这里我使用的是 postman
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    POST
    https://open-console.jd.com/messageSubscribe/addUserAKAndSub

    + Headers
    cookie xxx

    + Body -> raw -> JSON(各个参数,下面会讲解)
    {
    "appkey":"...C68",
    "topicId":"165",
    "accessKeyId":"...EE8",
    "accessKeySecret":"...349",
    "jdCloudPin":"...",
    "clusterRegion":"cn-east-2",
    "queueServiceType":"JCQ"
    }

参数获取

Headers:cookie

1、登录 商家开放控制台
2、右键检查
3、选择 Network(网络)
4、此时,刷新一下当前页面
5、Name 选中,查看右边 “Headers -> Request Headers -> cookie”,把对应的值,全部选中复制
在这里插入图片描述

json body

  • json body:appkey
    登录 京东开放平台,选择“概览”,即可查看“AppKey”
  • json body:topicId
    这里说了,是id,
    例如:POP订单备注变更 https://open.jd.com/home/home#/doc/msgApi?apiCateId=81&apiId=165
    其中,url 中 apiId 就是要填写的"topicId":"165"
  • json body:accessKeyId/accessKeySecret
    前面已经说了,地址 Access Key管理,没有就创建一个即可
  • json body:jdCloudPin
    对应,基本资料 中的账户名
  • json body:clusterRegion
    因为我是上海的云主机,所以我要订阅上海的 JCQ topic 主题。这里填写 "clusterRegion":"cn-east-2"
    若你要订阅其他地区,可以咨询客服:支持中心,选择“立即提问”,问题类型选择“JDO-消息服务、数据推送 -> 消息服务、数据推送接入问题”。
    例如:假设你是北京的主机,就说:“你好,我是北京的云主机,想订阅 topic xxx,订阅时,没有区域为北京的选项,clusterRegion 区域应该填写什么?”。
    正常上班的情况下,那边过会就会回复。
  • json body:queueServiceType
    固定写法,"queueServiceType":"JCQ"

查看是否订阅成功

发送成功,登录 商家开发控制台,选择应用,“消息服务 -> 我的订阅”,若显示订阅失败,请与客服联系。联系客服的方式,上面“json body:clusterRegion”已经提到。就说:“显示订阅失败,你看一下。”,然后给出订阅失败的截图,给出 请求体。

JDK 使用

JDK 使用(待整理)
2021-07-07 10:14:49 补

1、导入 pom 依赖

1
2
3
4
5
6
7
<!-- 京东云鼎消息队列 start -->
<dependency>
<groupId>com.jdcloud</groupId>
<artifactId>jcq-java-sdk</artifactId>
<version>1.3.1.Final</version>
</dependency>
<!-- 京东云鼎消息队列 end -->

2、代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
package com.decathlon.sfc.order.bff.core.thread;

import com.decathlon.sfc.order.bff.common.constant.MsgCode;
import com.decathlon.sfc.order.bff.common.exception.BizException;
import com.decathlon.sfc.order.bff.core.entity.task.Task;
import com.decathlon.sfc.order.bff.core.util.JsonUtil;
import com.fasterxml.jackson.databind.JsonNode;
import com.jcloud.jcq.client.Exception.ClientException;
import com.jcloud.jcq.client.consumer.ConsumeResult;
import com.jcloud.jcq.client.consumer.MessageListener;
import com.jcloud.jcq.protocol.Message;
import com.jcloud.jcq.sdk.JCQClientFactory;
import com.jcloud.jcq.sdk.auth.UserCredential;
import com.jcloud.jcq.sdk.consumer.Consumer;
import com.jcloud.jcq.sdk.consumer.ConsumerConfig;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.concurrent.locks.LockSupport;

/**
* 京东云鼎 消息队列 消费者线程
* JCQ = JD Cloud Queue
*/
@Slf4j
public class JcqTaskThread extends BaseTaskThread {

/**
* 京东 JCQ 访问参数对象
*/
private JcqParams jcqParams;

/**
* Task Object,封装了一些必要的数据
*/
private final Task task;
/**
* Jcq 消费者
*/
private Consumer consumer;

public JcqTaskThread(Task task) {
this.task = task;
init();
}

/**
* 进行一些必要的初始化
*/
private void init() {
// 1、获取配置,解析 JcqParams 对象
String sourceCfg = task.getInputCfg().getSourceCfg();
try {
this.jcqParams = JsonUtil.decode(sourceCfg, JcqParams.class);
} catch (IOException e) {
throw new BizException(MsgCode.REQUEST_FAIL_PARAM_ERR.getCode(), "InputCfg SourceCfg JSON not match");
}
}

@Override
protected void bizFunc() {
UserCredential userCredential = new UserCredential(jcqParams.getAccessKey(), jcqParams.getSecretKey());
ConsumerConfig consumerConfig = ConsumerConfig.builder()
.consumerGroupId(jcqParams.getConsumerGroupId())
.metaServerAddress(jcqParams.getMetaServerAddress())
.build();

// 1、创建 Consumer
try {
this.consumer = JCQClientFactory.getInstance().createConsumer(userCredential, consumerConfig);
} catch (ClientException e) {
throw new BizException(MsgCode.SERVER_ERR.getCode(), String.format("创建 JcqTaskThread 失败... JcqParams = %s", jcqParams.toString()));
}

// 2、创建 Listener
MessageListener listener = list -> {
log.info("topic={},监听到消息{}条", jcqParams.getTopic(), list.size());
for (Message message : list) {
String body = new String(message.getBody());
log.info(">>> Topic {} >>> QueueId {} >>> MessageId {} >>> Body {}", message.getTopic(), message.getQueueId(), message.getMessageId(), body);
handleTask(body);
}
return ConsumeResult.SUCCESS;
};

// 3、订阅 Topic
try {
this.consumer.subscribeTopic(jcqParams.getTopic(), listener, null);
} catch (ClientException e) {
throw new BizException(MsgCode.SERVER_ERR.getCode(), String.format("订阅 topic 失败... JcqParams = %s", jcqParams.toString()));
}

// 4、开启 Consumer => 开始消费
try {
consumer.start();
} catch (ClientException e) {
throw new BizException(MsgCode.SERVER_ERR.getCode(), String.format("consumer.start() 启动消费者失败... JcqParams = %s", jcqParams.toString()));
}

// 5、挂起当前线程(防止 run 方法结束,线程退出)
LockSupport.park();
}


/**
* close the task if needed
*/
@Override
public void close() {
// 1、关闭 Consumer(关闭相关资源)
try {
this.consumer.shutdown();
} catch (ClientException e) {
log.warn("关闭 JcqTaskThread 失败...", e);
}

// 2、唤醒线程(run方法结束 => 退出线程)
LockSupport.unpark(this);
}

/**
* 处理接收到的数据 -> 转换 -> 发送到下游
*
* @param inputData 上游接收到的数据
*/
private void handleTask(String inputData) {
try {
// 0. Json => Object
JsonNode inputDataNode = JsonUtil.decodeJson(inputData);

// 1. 传入 inputData
task.setInputData(inputDataNode);

// 2. 转换 inputData -> outputData
task.executeMapping();

// 3. 把 outputData -> 下游
task.executeOutput();
} catch (Exception e) {
log.warn("topic={} 处理业务异常...", jcqParams.getTopic(), e);
}

}


/**
* 京东 JCQ 访问参数对象
*/
@Data
public static class JcqParams {
/**
* 用户accessKey
*/
private String accessKey;
/**
* 用户secretKey
*/
private String secretKey;
/**
* 元数据服务器地址
*/
private String metaServerAddress;
/**
* topic名称
*/
private String topic;
/**
* 消费组Id
*/
private String consumerGroupId;
}
}

3、京东云鼎查看 topic
https://yd-jcq-console.jdcloud.com/topics

4、查看订阅信息
登录 商家开发控制台,选择应用,“消息服务 -> 我的订阅”,若显示订阅失败,请与客服联系。联系客服的方式,上面“json body:clusterRegion”已经提到。就说:“显示订阅失败,你看一下。”,然后给出订阅失败的截图,给出 请求体。