Commit 09eaf605 by zihan

使用3个ws链接

parent 576f0d0c
...@@ -47,111 +47,121 @@ class biboxApi { ...@@ -47,111 +47,121 @@ class biboxApi {
this.allowRequest = true; this.allowRequest = true;
} }
subscribeSymbolsAndTicker(symbols, callback) { // subscribeSymbolsAndTicker(symbols, callback) {
// this._publicRequest('/v1/bullet/usercenter/loginUser',{protocol:'websocket',encrypt:true},(error,result)=>{
// if(error){
// console.log("fetch ws token error");
// this.subscribeSymbolsAndTicker(symbols,callback);
// }else{
// const token = result.data.bulletToken;
// this._openWs(symbols,token,callback);
// }
// });
// }
// _openWs(symbols,token, callback) {
// const reqURL = `${wsUrl}?bulletToken=${token}&format=json&resource=api`;
// const wss = new WebSocket(reqURL, {agent});
// wss.on('open', () => {
// console.log("websocket on open");
// });
// wss.on('message', (data) => {
// const response = JSON.parse(data);
// if(response.type === 'ack'){
// const id = response.id;
// for(const symbol of symbols){
// wss.send(JSON.stringify({id,type:'subscribe','topic':`/trade/${symbol}_TRADE`}))
// }
// setInterval(()=>{
// wss.send(JSON.stringify({id,type:'ping'}))
// },40000);
// }else if(response.type === 'subscribe'){
// callback(null,response);
// }else{
// console.log(data);
// }
// });
// wss.on('error', (error) => {
// console.log(" websocket error:");
// console.log(error);
// })
// wss.on('close', () => {
// console.log("websocket closed");
// setTimeout(() => {
// this.subscribeSymbolsAndTicker(symbols, callback);
// }, 2000);
// })
// }
subscribeSymbols(symbols, depth, callback) {
this._publicRequest('/v1/bullet/usercenter/loginUser',{protocol:'websocket',encrypt:true},(error,result)=>{ this._publicRequest('/v1/bullet/usercenter/loginUser',{protocol:'websocket',encrypt:true},(error,result)=>{
if(error){ if(error){
console.log("fetch ws token error"); console.log("fetch ws token error");
this.subscribeSymbolsAndTicker(symbols,callback); this.subscribeSymbols(symbols, depth, callback);
}else{ }else{
const token = result.data.bulletToken; for(let i=0;i<3;i++){
this._doSubscribe(symbols,token,callback); const ip = IPs.length >i ? IPs[i]:IPs[IPs.length -1];
setTimeout(()=>{
this._openWs(result, symbols, depth,ip, callback);
},i*10000);
}
} }
}); });
} }
_openWs(result, symbols, depth, ipAddress, callback) {
_doSubscribe(symbols,token, callback) { const token = result.data.bulletToken;
const reqURL = `${wsUrl}?bulletToken=${token}&format=json&resource=api`; const reqURL = `${wsUrl}?bulletToken=${token}&format=json&resource=api`;
const wss = new WebSocket(reqURL, {agent}); const wss = new WebSocket(reqURL, {agent,localAddress:ipAddress});
wss.on('open', () => { wss.on('open', () => {
console.log("websocket on open"); console.log("websocket on open");
}); });
wss.on('message', (data) => { wss.on('message', (data) => {
const response = JSON.parse(data); const response = JSON.parse(data);
if(response.type === 'ack'){ if (response.type === 'ack') {
const id = response.id; const id = response.id;
for(const symbol of symbols){ setInterval(() => {
wss.send(JSON.stringify({id,type:'subscribe','topic':`/trade/${symbol}_TRADE`})) if (wss.readyState === constants.SocketReadyStateOpen) {
wss.send(JSON.stringify({id, type: 'ping'}))
}
}, 40000);
for (const symbol of symbols) {
this._subscribeSymbol(wss, id, symbol, depth)
} }
setInterval(()=>{ } else if (response.topic && response.topic.startsWith('/trade')) {
wss.send(JSON.stringify({id,type:'ping'})) const timeStamp = response.data.time;
},40000); const symbol = response.topic.replace('/trade/', '').replace('_TRADE', '');
}else if(response.type === 'subscribe'){ let asks = totalOrderbook[symbol].asks || [];
callback(null,response); let bids = totalOrderbook[symbol].bids || [];
}else{ if (response.data.type === 'SELL') {
console.log(data); const updateData = getUpdatedData(asks, response);
asks = mergeDepth(asks, updateData, true);
} else if (response.data.type === 'BUY') {
const updateData = getUpdatedData(bids, response);
bids = mergeDepth(bids, updateData, false);
}
totalOrderbook[symbol].bids = bids;
totalOrderbook[symbol].asks = asks;
const ret = {data: {SELL: asks, BUY: bids}, timestamp: timeStamp, symbol: symbol}
callback(null, ret);
} }
// else {
// // console.log(data);
// }
}); });
wss.on('error', (error) => { wss.on('error', (error) => {
console.log(" websocket error:"); console.log("websocket error:");
console.log(error); console.log(error);
}) })
wss.on('close', () => { wss.on('close', () => {
console.log("websocket closed"); console.log("websocket closed");
setTimeout(() => { setTimeout(() => {
this.subscribeSymbolsAndTicker(symbols, callback); this.subscribeSymbols(symbols, depth, callback);
}, 2000); }, 2000);
}) })
} }
subscribeSymbols(symbols, depth, callback) {
this._publicRequest('/v1/bullet/usercenter/loginUser',{protocol:'websocket',encrypt:true},(error,result)=>{
if(error){
console.log("fetch ws token error");
this.subscribeSymbols(symbols, depth, callback);
}else{
const token = result.data.bulletToken;
const reqURL = `${wsUrl}?bulletToken=${token}&format=json&resource=api`;
const wss = new WebSocket(reqURL, {agent});
wss.on('open', () => {
console.log("websocket on open");
});
wss.on('message', (data) => {
const response = JSON.parse(data);
if(response.type === 'ack'){
const id = response.id;
setInterval(()=>{
if(wss.readyState === constants.SocketReadyStateOpen){
wss.send(JSON.stringify({id,type:'ping'}))
}
},40000);
for(const symbol of symbols){
this._subscribeSymbol(wss, id, symbol, depth)
}
}else if(response.topic && response.topic.startsWith('/trade')){
const timeStamp = response.data.time;
const symbol = response.topic.replace('/trade/','').replace('_TRADE','');
let asks = totalOrderbook[symbol].asks || [];
let bids = totalOrderbook[symbol].bids || [];
if(response.data.type === 'SELL'){
const updateData = getUpdatedData(asks, response);
asks = mergeDepth(asks, updateData, true);
}else if(response.data.type === 'BUY'){
const updateData = getUpdatedData(bids,response);
bids = mergeDepth(bids, updateData, false);
}
totalOrderbook[symbol].bids = bids;
totalOrderbook[symbol].asks = asks;
const ret = {data: {SELL: asks, BUY: bids}, timestamp: timeStamp, symbol: symbol}
callback(null,ret);
}else{
// console.log(data);
}
});
wss.on('error', (error) => {
console.log("websocket error:");
console.log(error);
})
wss.on('close', () => {
console.log("websocket closed");
setTimeout(() => {
this.subscribeSymbols(symbols, depth, callback);
}, 2000);
})
}
});
}
_subscribeSymbol(wss, id, symbol, depth){ _subscribeSymbol(wss, id, symbol, depth){
const ipAddress = IPs[Math.round(Math.random()*(IPs.length-1))]; const ipAddress = IPs[Math.round(Math.random()*(IPs.length-1))];
this.getOrderbook(symbol,depth,ipAddress,(error,result)=>{ this.getOrderbook(symbol,depth,ipAddress,(error,result)=>{
...@@ -162,7 +172,10 @@ class biboxApi { ...@@ -162,7 +172,10 @@ class biboxApi {
return; return;
} }
const data = result.data; const data = result.data;
totalOrderbook[symbol]={asks:data.SELL,bids:data.BUY}; const oldData = totalOrderbook[symbol];
if(oldData && data.timestamp > oldData.timestamp){
totalOrderbook[symbol]={asks:data.SELL,bids:data.BUY,timestamp:data.timestamp};
}
wss.send(JSON.stringify({id,type:'subscribe','topic':`/trade/${symbol}_TRADE`})) wss.send(JSON.stringify({id,type:'subscribe','topic':`/trade/${symbol}_TRADE`}))
}); });
} }
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment