Commit 52a3cc3a by zihan

kucoin更改

parent ad7e5447
......@@ -2,13 +2,14 @@ const WebSocket = require('ws');
const SocksProxyAgent = require('socks-proxy-agent');
const proxy = process.env.agent;
const agent = proxy ? new SocksProxyAgent(proxy) : null;
const wsUrl = 'wss://push1.kucoin.com/endpoint';
const request = require('request');
const host = 'https://api.kucoin.com';
const CryptoJS = require('crypto-js');
const constants = require('./constants');
const {IPs, mergeDepth} = require('./util');
const {IPs, mergeDepthAsk, mergeDepthBids, mergeDepth} = require('./util');
let totalOrderbook = {};
let cachedUpdate = {};
const fetchingMap = {};
let getUpdatedData = function (currentData, response) {
......@@ -24,7 +25,7 @@ let getUpdatedData = function (currentData, response) {
}
else if (response.data.action === 'CANCEL') {
newCount -= response.data.count;
if(newCount <0){
if (newCount < 0) {
newCount = 0;
}
}
......@@ -32,7 +33,7 @@ let getUpdatedData = function (currentData, response) {
}
}
if (!has) {
if(response.data.action !== 'CANCEL')
if (response.data.action !== 'CANCEL')
updateData.push([response.data.price, response.data.count]);
} else {
// updateData = [[response.data.price, newCount]];
......@@ -46,66 +47,80 @@ class biboxApi {
this.apiKey = '5ba054d2e0abb830dee81e1d';
this.apiSecret = 'baa55ba9-7290-431e-9fd4-42aa4598887f';
this.allowRequest = true;
this.index = 0;
}
_selectIp(){
const ip = IPs[this.index % IPs.length];
this.index++;
return ip;
}
subscribeSymbols(symbols, depth, callback) {
this._publicRequest('/v1/bullet/usercenter/loginUser',{protocol:'websocket',encrypt:true},(error,result)=>{
if(error){
const ip = this._selectIp();
this._publicRequest('/api/v1/bullet-public', {}, (error, result) => {
if (error) {
console.log("fetch ws token error");
this.subscribeSymbols(symbols, depth, callback);
}else{
// for(let i=0;i<2;i++){
const ip = IPs[0];
// setTimeout(()=>{
this._openWs(result, symbols, depth,ip, callback);
// },i*10000);
// }
} else {
this._openWs(result, symbols, depth, ip, callback);
}
});
}, ip, 'POST');
}
_openWs(result, symbols, depth, ipAddress, callback) {
const token = result.data.bulletToken;
const reqURL = `${wsUrl}?bulletToken=${token}&format=json&resource=api`;
const wss = new WebSocket(reqURL, {agent,localAddress:ipAddress});
const token = result.data.token;
const serverInfo = result.data.instanceServers[0];
const pingInterval = serverInfo.pingInterval;
const wsUrl = serverInfo.endpoint;
const reqURL = `${wsUrl}?token=${token}&connectId=${Date.now()}`;
const wss = new WebSocket(reqURL, {agent, localAddress: ipAddress});
wss.on('open', () => {
console.log("websocket on open");
});
wss.on('message', (data) => {
const response = JSON.parse(data);
if (response.type === 'ack') {
if (response.type === 'welcome') {
const id = response.id;
setInterval(() => {
if (wss.readyState === constants.SocketReadyStateOpen) {
wss.send(JSON.stringify({id, type: 'ping'}))
}
}, 40000);
for (let i=0;i<symbols.length;i++) {
const symbol = symbols[i];
}, pingInterval);
const topics = symbols.join(",");
wss.send(JSON.stringify({id,type:"subscribe",topic:`/market/level2:${topics}`,response:true}));
} else if (response.topic && response.topic.startsWith('/market/level2')) {
const symbol = response.data.symbol;
const snapshot = totalOrderbook[symbol];
if(!snapshot ){
if(!fetchingMap[symbol]){
fetchingMap[symbol] = true;
setTimeout(()=>{
this._subscribeSymbol(wss, id, symbol, depth)
},i*10);
}
} else if (response.topic && response.topic.startsWith('/trade')) {
const timeStamp = response.data.time;
const symbol = response.topic.replace('/trade/', '').replace('_TRADE', '');
let asks = totalOrderbook[symbol].asks || [];
let bids = totalOrderbook[symbol].bids || [];
if (response.data.type === 'SELL') {
const updateData = getUpdatedData(asks, response);
asks = mergeDepth(asks, updateData, true);
} else if (response.data.type === 'BUY') {
const updateData = getUpdatedData(bids, response);
bids = mergeDepth(bids, updateData, false);
}
totalOrderbook[symbol].bids = bids;
totalOrderbook[symbol].asks = asks;
const ret = {data: {SELL: asks, BUY: bids}, timestamp: timeStamp, symbol: symbol,seq:response.seq};
callback(null, ret);
}
// else {
// // console.log(data);
this.getOrderbook(symbol,100,ipAddress,this._handleOrderbookSnapshot.bind(this,symbol));
},100);
}
let array = cachedUpdate[symbol];
if(!array){
array = [];
cachedUpdate[symbol] = array;
}
array.push(response.data);
}else{
const updatedAsks = response.data.changes.asks;
const updatedBids = response.data.changes.bids;
const mergedAsks = mergeDepth(snapshot.asks,updatedAsks,true);
const mergedBids = mergeDepth(snapshot.bids, updatedBids, false);
// if(symbol === 'BCHSV-BTC'){
// console.log(updatedBids.toString())
// console.log(snapshot.bids.toString());
// console.log(mergedBids.toString());
// console.log("====================")
// }
snapshot.asks = mergedAsks.slice(0,50);
snapshot.bids = mergedBids.slice(0,50);
callback(null,{symbol,asks:mergedAsks, bids:mergedBids, timestamp:Date.now()});
}
}
});
wss.on('error', (error) => {
console.log("websocket error:");
......@@ -114,32 +129,60 @@ class biboxApi {
wss.on('close', () => {
console.log("websocket closed");
setTimeout(() => {
// this.subscribeSymbols(symbols, depth, callback);
this._openWs(result,symbols,depth,ipAddress,callback);
this._openWs(result, symbols, depth, ipAddress, callback);
}, 2000);
})
}
_subscribeSymbol(wss, id, symbol, depth){
const ipAddress = IPs[Math.round(Math.random()*(IPs.length-1))];
this.getOrderbook(symbol,depth,ipAddress,(error,result)=>{
_handleOrderbookSnapshot(symbol,error,data){
fetchingMap[symbol] = false;
if(error){
console.error("get symbol by rest error:");
console.error("get snapshot error:"+symbol);
console.error(error);
setTimeout(()=>{
this._subscribeSymbol(wss,id,symbol,depth);
},2000);
return;
}
const data = result.data;
const oldData = totalOrderbook[symbol];
if(!oldData || data.timestamp > oldData.timestamp){
totalOrderbook[symbol]={asks:data.SELL.map((item)=>[item[0],item[1]]),bids:data.BUY.map((item)=>[item[0],item[1]]),timestamp:data.timestamp};
}
wss.send(JSON.stringify({id,type:'subscribe','topic':`/trade/${symbol}_TRADE`}))
totalOrderbook[symbol] = data.data;
const lupdates = cachedUpdate[symbol];
for(const update of lupdates){
const updatedAsks = update.changes.asks;
const updatedBids = update.changes.bids;
const filteredAsks = updatedAsks.filter((item)=>{
return item[2] > data.data.sequence;
});
const filteredBids = updatedBids.filter((item)=>{
return item[2] > data.data.sequence;
})
const mergedAsks = mergeDepth(totalOrderbook[symbol].asks,filteredAsks,true);
const mergedBids = mergeDepth(totalOrderbook[symbol].bids, filteredBids, false);
totalOrderbook[symbol].asks = mergedAsks;
totalOrderbook[symbol].bids = mergedBids;
}
}
// _subscribeSymbol(wss, id, symbol, depth) {
// const ipAddress = IPs[Math.round(Math.random() * (IPs.length - 1))];
// this.getOrderbook(symbol, depth, ipAddress, (error, result) => {
// if (error) {
// console.error("get symbol by rest error:");
// console.error(error);
// setTimeout(() => {
// this._subscribeSymbol(wss, id, symbol, depth);
// }, 2000);
// return;
// }
// const data = result.data;
// const oldData = totalOrderbook[symbol];
// if (!oldData || data.timestamp > oldData.timestamp) {
// totalOrderbook[symbol] = {
// asks: data.SELL.map((item) => [item[0], item[1]]),
// bids: data.BUY.map((item) => [item[0], item[1]]),
// timestamp: data.timestamp
// };
// }
// wss.send(JSON.stringify({id, type: 'subscribe', 'topic': `/trade/${symbol}_TRADE`}))
// });
// }
transform(obj) {
var str = [];
for (var p in obj)
......@@ -157,13 +200,13 @@ class biboxApi {
return newObj;//返回排好序的新对象
}
_publicRequest(path, params,callback,bindIP) {
if(!this.allowRequest){
callback({code:"-2",message:"出现超频情况,暂停提交请求"});
_publicRequest(path, params, callback, bindIP, method = 'GET') {
if (!this.allowRequest) {
callback({code: "-2", message: "出现超频情况,暂停提交请求"});
return;
}
let url = host + path;
if (params) {
if (params && method === 'GET') {
url += "?";
const keys = Object.keys(params);
for (let i = 0; i < keys.length; i++) {
......@@ -175,10 +218,10 @@ class biboxApi {
}
const options = {
url,
method: 'GET',
method: method,
timeout: 8000,
forever: true,
localAddress:bindIP,
localAddress: bindIP,
};
request(options, (error, response, body) => {
......@@ -187,7 +230,7 @@ class biboxApi {
} else {
try {
const result = JSON.parse(body);
if (result.success) {
if (result.code === '200000') {
callback(null, result);
} else {
callback(result, null);
......@@ -195,18 +238,19 @@ class biboxApi {
} catch (e) {
// console.error(e);
// console.error(body);
if(response.statusCode == 200){
if (response.statusCode == 200) {
console.log(body);
console.log(e);
}
callback({statusCode:response.statusCode}, null);
callback({statusCode: response.statusCode}, null);
}
}
});
}
_request(method, path, params, callback) {
if(!this.allowRequest){
callback({code:"-2",message:"出现超频情况,暂停提交请求"});
if (!this.allowRequest) {
callback({code: "-2", message: "出现超频情况,暂停提交请求"});
return;
}
let url = host + path;
......@@ -223,7 +267,7 @@ class biboxApi {
}
const requestParams = {};
let form = params
if (method === 'GET'){
if (method === 'GET') {
url += '?' + this.transform(params);
form = {}
}
......@@ -247,7 +291,7 @@ class biboxApi {
} catch (e) {
console.error("parse body时出错");
console.error("status code:" + response.statusCode);
callback({statusCode:response.statusCode}, null)
callback({statusCode: response.statusCode}, null)
// if (response.statusCode === 429) {
// callback({statusCode: response.statusCode})
// } else {
......@@ -260,7 +304,7 @@ class biboxApi {
fetchSymbols(callback) {
this._publicRequest('/v1/market/open/symbols', {}, callback);
this._publicRequest('/api/v1/symbols', {}, callback);
}
order(price, amount, symbol, side, callback) {
......@@ -279,11 +323,11 @@ class biboxApi {
}
balance(limit, page, callback) {
this._request("GET", "/v1/account/balances",{limit: limit || 20, page: page || 1}, callback);
this._request("GET", "/v1/account/balances", {limit: limit || 20, page: page || 1}, callback);
}
coins_info(callback) {
this._publicRequest("/v1/market/open/coins",{}, callback);
this._publicRequest("/v1/market/open/coins", {}, callback);
}
getTrades(symbol, callback) {
......@@ -308,19 +352,20 @@ class biboxApi {
this._request("POST", "/v1/cancel-order", params, callback);
}
fetchHistorOrders(page,size,symbol,side,callback){
fetchHistorOrders(page, size, symbol, side, callback) {
const params = {
pair:symbol,
account_type:0,
pair: symbol,
account_type: 0,
page,
size,
order_side:side === constants.OrderSideBuy ? 1 : 2,
hide_cancel:0
order_side: side === constants.OrderSideBuy ? 1 : 2,
hide_cancel: 0
};
this._request("POST","/v1/orderpending","orderpending/pendingHistoryList",params,callback);
this._request("POST", "/v1/orderpending", "orderpending/pendingHistoryList", params, callback);
}
getOrderbook(symbol,depth,bindIP,callback){
this._publicRequest("/v1/open/orders",{"symbol":symbol,"limit":25},callback,bindIP);
getOrderbook(symbol, depth, bindIP, callback) {
this._publicRequest("/api/v1/market/orderbook/level2_100", {"symbol": symbol}, callback, bindIP);
}
}
......
......@@ -234,7 +234,17 @@ class BaseCollector {
this.currencySymbolMap[midCurrency] = [symbol];
}
}
this._subscribeSymbols(symbolNames,this._publishDataForStrategy3.bind(this),5);
const filteredSymbols = symbolNames.filter((item)=>{
const keySymbol = this._convertSymbolName(item);
const mid = this.getMidCurrency(keySymbol);
if(this.baseCurrencies.includes(mid)){
return true;
}
const array = this.getSymbols(mid);
return array.length >1
});
this._subscribeSymbols(filteredSymbols,this._publishDataForStrategy3.bind(this),5);
}
runStrategy3(){
......
......@@ -13,7 +13,7 @@ const Strategy3MaxAmountMap = {
KCS: 600,
}
class BiboxCollector extends BaseCollector{
class KucoinCollector extends BaseCollector{
constructor(wantedSymbols){
super("KUCOIN",baseCurrencies,machine,wantedSymbols);
this.api = new biboxApi('','');
......@@ -31,7 +31,7 @@ class BiboxCollector extends BaseCollector{
const symbolDetails = data.data;
const symbolMap = {};
for(let detail of symbolDetails){
if(baseCurrencies.includes(detail.coinTypePair)){
if(baseCurrencies.includes(detail.quoteCurrency) && detail.enableTrading){
symbolMap[detail.symbol] = detail;
}
}
......@@ -45,7 +45,7 @@ class BiboxCollector extends BaseCollector{
}
_subscribeSymbols(symbols,callback,subscribeDepth){
this._fetchDepthByWebsocket(symbols,subscribeDepth,callback)
this._fetchDepthByWebsocket(symbols.slice(0,50),subscribeDepth,callback)
// setTimeout(()=>{
// this._fetchDepthByRest(symbols,subscribeDepth,callback);
// },20000);
......@@ -80,17 +80,29 @@ class BiboxCollector extends BaseCollector{
_fetchDepthByWebsocket(symbols, depth, callback){
this.api.subscribeSymbols(symbols, depth, (error, result)=>{
function subsCallback(error,result){
if(error){
console.error("subscribe error");
console.error(error);
}else{
const data = result.data;
const timeStamp = result.timestamp;
const symbol = result.symbol;
callback(data.SELL.slice(0, depth), data.BUY.slice(0, depth), symbol, timeStamp,result.seq);
callback(result.asks.slice(0,depth), result.bids.slice(0,depth), result.symbol, result.timestamp);
}
}
let buffer = [];
for (let i = 0; i < symbols.length; i++) {
buffer.push(symbols[i]);
if (buffer.length === 95) {
const subsBuffer = buffer;
setTimeout(() => {
this.api.subscribeSymbols(subsBuffer, depth, subsCallback)
}, ((i + 1) / 10 - 1) * 50);
buffer = [];
}else if(i === symbols.length-1){
setTimeout(() => {
this.api.subscribeSymbols(buffer, depth, subsCallback)
}, ((i + 1) / 10 - 1) * 50);
}
}
})
}
_getCoinsInfo(){
......@@ -111,11 +123,14 @@ class BiboxCollector extends BaseCollector{
});
}
_runMonitor(callback){
callback({});
return;
//todo 接口也改了
let balanceMap = {};
let need = 1;
if(!coinInfoMap){
this._getCoinsInfo()
}
// if(!coinInfoMap){
// this._getCoinsInfo()
// }
this.api.balance(20, 1, (error,result)=>{
if(error){
console.error("get balance by rest error:");
......@@ -206,5 +221,5 @@ class BiboxCollector extends BaseCollector{
}
module.exports = BiboxCollector;
module.exports = KucoinCollector;
......@@ -20,6 +20,7 @@ class BiboxStrategy3 extends Strategy3 {
}
_doTrade(baseCurrency1, midCurrency, baseCurrency2, buyPrice, sellPrice, returnPrice, amount, returnAmount, doSaveOrder) {
return;
const buySymbol = this.collector.getSymbol(baseCurrency1, midCurrency);
const buyStartTime = Date.now();
const collector = this.collector;
......
......@@ -43,7 +43,7 @@ function testCollector(){
// const strategy3 = new BiboxStrategy3(collector);
// strategy3.run();
}
// testCollector();
testCollector();
function printCurrency(){
const currentArray = [,'ETH','BTC','LTC','BCH','USDT','USD','RMB',"RCN","WINGS","TRX","LEND","CMT","POWR","HSR","GAS","RDN","TNT","OAX"
......@@ -121,7 +121,7 @@ function printCurrency(){
}
})
}
printCurrency()
// printCurrency()
function testOrder(){
const order = new Order();
......
......@@ -39,8 +39,66 @@ function mergeDepth(oldDepth, updateDepth, isAsk) {
return i
}
function mergeDepthAsk(oldDepth, updateDepth,oldVersion) {
const results = [];
const cal = {old: 0, updated: 0};
for (; cal.old < oldDepth.length || cal.updated < updateDepth.length;) {
if (oldDepth[cal.old] === undefined) {
updateDepth[cal.updated].size > 1e-8 && results.push(updateDepth[cal.updated]);
cal.updated++;
} else if (updateDepth[cal.updated] === undefined) {
results.push(oldDepth[cal.old]);
cal.old++;
} else if (parseFloat(oldDepth[cal.old].price) > parseFloat(updateDepth[cal.updated].price)) {
updateDepth[cal.updated].size > 1e-8 && results.push(updateDepth[cal.updated]);
cal.updated++;
} else if (parseFloat(oldDepth[cal.old].price) === parseFloat(updateDepth[cal.updated].price)) {
if (parseFloat(updateDepth[cal.updated].size) > 1e-8) {
results.push(updateDepth[cal.updated]);
}
cal.updated++;
cal.old++;
} else {
results.push(oldDepth[cal.old]);
cal.old++;
}
}
return results
}
function mergeDepthBids(oldDepth, updateDepth, oldVersion) {
const result = [];
const cal = {old: 0, updated: 0};
for (; cal.old < oldDepth.length || cal.updated < updateDepth.length;) {
if (oldDepth[cal.old] === undefined) {
updateDepth[cal.updated].size > 1e-8 && result.push(updateDepth[cal.updated]);
cal.updated++;
} else if (updateDepth[cal.updated] === undefined) {
result.push(oldDepth[cal.old]);
cal.old++;
} else if (parseFloat(oldDepth[cal.old].price) < parseFloat(updateDepth[cal.updated].price)) {
updateDepth[cal.updated].size > 1e-8 && result.push(updateDepth[cal.updated]);
cal.updated++;
} else if (parseFloat(oldDepth[cal.old].price) === parseFloat(updateDepth[cal.updated].price)) {
if (parseFloat(updateDepth[cal.updated].size) > 1e-8) {
result.push(updateDepth[cal.updated++]);
cal.old++;
} else {
cal.updated++;
cal.old++;
}
} else {
result.push(oldDepth[cal.old++])
}
}
return result;
}
module.exports = {
IPs: ips,
mergeDepth: mergeDepth
mergeDepthBids:mergeDepthBids,
mergeDepthAsk:mergeDepthAsk,
mergeDepth:mergeDepth
}
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment