Commit e58564bc by zihan

去掉冗余代码;添加使用ws来获取行情

parent 01b69039
const WebSocket = require('ws');
const SocksProxyAgent = require('socks-proxy-agent');
const proxy = process.env.agent;
const agent = proxy ? new SocksProxyAgent(proxy) : null;
// const zlib = require('zlib');
const wsUrl = 'wss://push.bibox.com/';
const request = require('request');
const host = 'https://api.bibox.com';
const CrytoJS = require('crypto-js');
const constants = require('./constants');
class biboxApi {
constructor() {
this.apiKey = '61e6fa70476199d7245e2993b01d33ef14cb24dd';
this.apiSecret = '269bdc93d18f9215814a6a25dc38a4a8915cd63a';
this.allowRequest = true;
}
subscribeSymbolsAndTicker(symbols, callback) {
const channels = [];
const tickers = [];
// const channels = symbols.map((item) => {
// return JSON.stringify({event: "addChannel", "channel": `bibox_sub_spot_${item}_depth`, "binary": 1});
// });
for(let item of symbols){
channels.push(JSON.stringify({event: "addChannel", "channel": `bibox_sub_spot_${item}_depth`, "binary": 1}));
// tickers.push(JSON.stringify({event:"addChannel","channel":`bibox_sub_spot_${item}_ticker`,"binary":0}))
}
const wss = new WebSocket(wsUrl, {agent});
wss.on('open', () => {
console.log("websocket on open");
for (let i=0;i<channels.length;i++) {
wss.send(channels[i]);
wss.send(tickers[i]);
}
});
wss.on('message', (data) => {
const response = JSON.parse(data);
if (Array.isArray(response) && response.length && (response[0].channel.endsWith("depth") || response[0].channel.endsWith("ticker"))) {
callback(null, response[0]);
} else {
console.log(data);
callback(response, null);
}
});
wss.on('error', (error) => {
console.log(" websocket error:");
console.log(error);
})
wss.on('close', () => {
console.log("websocket closed");
setTimeout(()=>{
this.subscribeSymbolsAndTicker(symbols, callback);
},2000);
})
}
_publicRequest(path, params, callback) {
if(!this.allowRequest){
callback({code:"-2",message:"出现超频情况,暂停提交请求"});
return;
}
let url = host + path;
if (params) {
url += "?";
const keys = Object.keys(params);
for (let i = 0; i < keys.length; i++) {
url += `${keys[i]}=${params[keys[i]]}`;
if (i !== keys.length - 1) {
url += "&";
}
}
}
const options = {
url,
method: 'GET',
timeout: 8000,
forever: true,
};
request(options, (error, response, body) => {
if (error) {
callback(error);
} else {
try {
const result = JSON.parse(body);
if (result.result) {
callback(null, result);
} else {
callback(result, null);
}
} catch (e) {
console.log(e);
callback({statusCode:response.statusCode}, null);
}
}
});
}
_request(method, path, cmd, params, callback) {
if(!this.allowRequest){
callback({code:"-2",message:"出现超频情况,暂停提交请求"});
return;
}
const url = host + path;
const cmdStr = JSON.stringify([{"cmd": cmd, "body": params}]);
const sign = CrytoJS.HmacMD5(cmdStr, this.apiSecret).toString();
const form = {"cmds": cmdStr, "apikey": this.apiKey, "sign": sign};
const requestParams = {};
requestParams["url"] = url;
requestParams["method"] = method;
requestParams["form"] = form;
requestParams["timeout"] = 10000;
requestParams["forever"] = true;
request(requestParams, (error, response, body) => {
if (error) {
callback(error);
} else {
try {
const result = JSON.parse(body);
if (result.error) {
callback(result.error, null);
if(result.error.code == '2091' && this.allowRequest){
this.allowRequest = false;
console.log("请求超频,将会暂停发送请求");
setTimeout(()=>{
this.allowRequest = true;
},1000 * 60 * 5);
}
} else {
callback(null, result.result[0]);
}
} catch (e) {
console.error("parse body时出错");
console.error("status code:" + response.statusCode + ",body:" + response.body);
// if (response.statusCode === 429) {
// callback({statusCode: response.statusCode})
// } else {
callback(e, null);
// }
}
}
});
}
fetchSymbols(callback) {
this._publicRequest('/v1/mdata', {"cmd": 'pairList'}, callback);
}
order(price, amount, symbol, side, callback) {
//账户类型,0-普通账户,1-信用账户
//交易类型,1-市价单,2-限价单
//交易方向,1-买,2-卖
const params = {
pair: symbol,
account_type: 0,
order_type: 2,
order_side: side === constants.OrderSideBuy ? 1 : 2,
pay_bix: 1,
price,
amount,
// money:parseFloat(price) * parseFloat(amount)
};
this._request("POST", "/v1/orderpending", "orderpending/trade", params, callback)
}
balance(callback) {
this._request("POST", "/v1/transfer", "transfer/assets", {select: 1}, callback);
}
getTrades(symbol, callback) {
const params = {
"cmd": "deals",
"pair": symbol,
"size": 10
}
this._publicRequest("/v1/mdata", params, callback);
}
searchOrder(orderId, callback) {
const params = {
id: orderId
}
this._request("POST", "/v1/orderpending", "orderpending/order", params, callback);
}
cancelOrder(orderId, callback) {
const params = {
orders_id: orderId
}
this._request("POST", "/v1/orderpending", "orderpending/cancelTrade", params, callback);
}
fetchHistorOrders(page,size,symbol,side,callback){
const params = {
pair:symbol,
account_type:0,
page,
size,
order_side:side === constants.OrderSideBuy ? 1 : 2,
hide_cancel:0
};
this._request("POST","/v1/orderpending","orderpending/pendingHistoryList",params,callback);
}
getOrderbook(symbol,depth,callback){
this._publicRequest("/v1/mdata",{"pair":symbol,size:depth,cmd:'depth'},callback);
}
}
module.exports = biboxApi;
\ No newline at end of file
......@@ -2,10 +2,8 @@ const WebSocket = require('ws');
const SocksProxyAgent = require('socks-proxy-agent');
const proxy = process.env.agent;
const agent = proxy ? new SocksProxyAgent(proxy) : null;
// const zlib = require('zlib');
// const wsUrl = 'wss://push.bibox.com/';
const wsUrl = 'wss://push1.kucoin.com/endpoint';
const request = require('request');
// const host = 'https://api.kucoin.com';
const host = 'https://api.kucoin.com';
const CryptoJS = require('crypto-js');
const constants = require('./constants');
......@@ -18,46 +16,59 @@ class biboxApi {
this.allowRequest = true;
}
// subscribeSymbolsAndTicker(symbols, callback) {
// const channels = [];
// const tickers = [];
// // const channels = symbols.map((item) => {
// // return JSON.stringify({event: "addChannel", "channel": `bibox_sub_spot_${item}_depth`, "binary": 1});
// // });
// for(let item of symbols){
// channels.push(JSON.stringify({event: "addChannel", "channel": `bibox_sub_spot_${item}_depth`, "binary": 1}));
// // tickers.push(JSON.stringify({event:"addChannel","channel":`bibox_sub_spot_${item}_ticker`,"binary":0}))
// }
// const wss = new WebSocket(wsUrl, {agent});
// wss.on('open', () => {
// console.log("websocket on open");
// for (let i=0;i<channels.length;i++) {
// wss.send(channels[i]);
// wss.send(tickers[i]);
// }
// });
// wss.on('message', (data) => {
// const response = JSON.parse(data);
// if (Array.isArray(response) && response.length && (response[0].channel.endsWith("depth") || response[0].channel.endsWith("ticker"))) {
// callback(null, response[0]);
// } else {
// console.log(data);
// callback(response, null);
// }
// });
// wss.on('error', (error) => {
// console.log(" websocket error:");
// console.log(error);
// })
// wss.on('close', () => {
// console.log("websocket closed");
// setTimeout(()=>{
// this.subscribeSymbolsAndTicker(symbols, callback);
// },2000);
// })
// }
subscribeSymbolsAndTicker(symbols, callback) {
this._publicRequest('/v1/bullet/usercenter/loginUser',{protocol:'websocket',encrypt:true},(error,result)=>{
if(error){
console.log("fetch ws token error");
this.subscribeSymbolsAndTicker(symbols,callback);
}else{
const token = result.data.bulletToken;
this._doSubscribe(symbols,token,callback);
}
});
}
_doSubscribe(symbols,token, callback) {
const channels = [];
const tickers = [];
for (let item of symbols) {
channels.push(JSON.stringify({event: "addChannel", "channel": `bibox_sub_spot_${item}_depth`, "binary": 1}));
// tickers.push(JSON.stringify({event:"addChannel","channel":`bibox_sub_spot_${item}_ticker`,"binary":0}))
}
const reqURL = `${wsUrl}?bulletToken=${token}&format=json&resource=api`;
const wss = new WebSocket(reqURL, {agent});
wss.on('open', () => {
console.log("websocket on open");
});
wss.on('message', (data) => {
const response = JSON.parse(data);
if(response.type === 'ack'){
const id = response.id;
for(const symbol of symbols){
wss.send(JSON.stringify({id,type:'subscribe','topic':`/trade/${symbol}_TRADE`}))
}
setInterval(()=>{
wss.send(JSON.stringify({id,type:'ping'}))
},40000);
}else if(response.type === 'subscribe'){
callback(null,response);
}else{
console.log(data);
}
});
wss.on('error', (error) => {
console.log(" websocket error:");
console.log(error);
})
wss.on('close', () => {
console.log("websocket closed");
setTimeout(() => {
this.subscribeSymbolsAndTicker(symbols, callback);
}, 2000);
})
}
transform(obj) {
var str = [];
for (var p in obj)
......
const BaseCollector = require('./baseCollector');
const baseCurrencies = ["BIX","ETH","BTC","USDT"];
const restCurrencies = ["PRA","AAC","AT","RED","BU","QTUM","EOSDAC","HPB","ORME","TRX","TTC","BIX","BTC","ETH"];
const machine = process.env['MACHINE'];
const biboxApi = require('./api');
const zlib = require('zlib');
const sendEmail = require('./mailer');
let bixNumMonitor = 0;
const Strategy3MaxAmountMap = {
USDT: 1000,
BTC: 0.15,
ETH: 1.5,
BIX:600
}
class BiboxCollector extends BaseCollector{
constructor(wantedSymbols){
super("BIBOX",baseCurrencies,machine,wantedSymbols);
this.api = new biboxApi('','');
setInterval(()=>{
if(bixNumMonitor >=20){
sendEmail("Bibox 平台币数量已少于80","平台币数量不够啦,先手动处理一下啦");
bixNumMonitor = 0;
}
},1000 * 60 * 20);
}
_convertSymbolName(symbol){
return symbol.replace("_","");
}
_fetchSymbols(callback){
this.api.fetchSymbols((error,data)=>{
if(error){
console.error("fetch symbol error");
console.error(error);
this._fetchSymbols(callback);
}else{
const symbolDetails = data.result;
const symbolMap = {};
for(let detail of symbolDetails){
symbolMap[detail.pair] = detail;
}
callback(null,symbolMap);
}
})
}
_runMonitorInterval(){
return 4000;
}
_subscribeSymbols(symbols,callback,subscribeDepth){
// this._fetchDepthByRest(symbols,subscribeDepth,callback);
function subscribeCallback(error,data){
if(error){
console.error("subscribe error");
console.error(error);
}else if(data.channel.endsWith("depth")){
const symbol = data.channel.replace("bibox_sub_spot_","").replace("_depth","").replace("_","");
const decodeStr = Buffer.from(data.data,'base64');
const unzipData = zlib.unzipSync(decodeStr);
const result = JSON.parse(unzipData.toString());
// const symbol = result.pair.replace("_","");
const timeStamp = result.update_time;
const asks = result.asks.slice(0,subscribeDepth).map((item)=>[item.price,item.volume]);
const bids = result.bids.slice(0,subscribeDepth).map((item)=>[item.price,item.volume]);
callback(asks, bids, symbol, timeStamp);
// console.log("深度延时:"+(Date.now()-timeStamp));
}else{
const ticker = data.data;
const symbol = ticker.pair.replace("_","");
const bids = [[ticker.buy,ticker.buy_amount]];
const asks = [[ticker.sell,ticker.sell_amount]];
callback(asks,bids,symbol,ticker.timestamp);
console.log(symbol + "ticker 延时:"+(Date.now()-ticker.timestamp));
}
}
for(let i=0;i<7;i++){
setTimeout(()=>{
this.api.subscribeSymbolsAndTicker(symbols,subscribeCallback);
},i*30000);
}
}
_fetchDepthByRest(symbols,depth,callback){
const restSymbols = [];
for(let symbol of symbols){
const midCurrency = symbol.split("_")[0];
if(restCurrencies.includes(midCurrency)){
restSymbols.push(symbol);
}
}
const perInterval = 500;
const totalInterval = perInterval * restSymbols.length+50;
setInterval(()=>{
const sortedSymbols = restSymbols.sort(()=>{
return Math.random()>0.5
});
for(let i=0;i<sortedSymbols.length;i++){
const symbol = sortedSymbols[i];
setTimeout(()=>{
this.api.getOrderbook(symbol,depth,(error,result)=>{
if(error){
console.error("get depth by rest error:");
console.error(error);
return;
}
const data = result.result;
const timeStamp = data.update_time;
callback(data.asks.map((item)=>[item.price,item.volume]),data.bids.map((item)=>[item.price,item.volume]),symbol,timeStamp);
})
},i*perInterval);
}
},totalInterval);
}
_runMonitor(callback){
this.api.balance((error,result)=>{
if(error){
console.error("get balance error");
console.error(error);
callback(null);
}else{
const balanceList = result.result.assets_list;
const balanceMap = {};
for(let detail of balanceList){
if(detail.balance >0 || detail.freeze >0 || balanceMap[detail.coin_symbol]){
balanceMap[detail.coin_symbol] = {available:detail.balance,onOrder:detail.freeze};
}
}
callback(balanceMap);
if(this.getCurrencyBalance("BIX",true)<80){
bixNumMonitor ++ ;
}else {
bixNumMonitor >0 ?bixNumMonitor-- : bixNumMonitor=0;
}
}
})
}
getSymbol(fromCurrency,toCurrency){
let symbolDetail = this._getSymbolDetail(fromCurrency,toCurrency);
return symbolDetail.pair;
}
getFeeRate(fromCurrency,toCurrency){
return 0.0005;
}
processAmount(fromCurrency,toCurrency,amount){
//有可能会由于数值过小,传进来的amount被表示为科学计数法法
const amountStr = parseFloat(amount).toFixed(10);
const nums = amountStr.split('.');
if(nums.length ===2 && nums[1].length>4){
return nums[0]+"."+nums[1].slice(0,4);
}
return amount;
}
processPrice(fromCurrency,toCurrency,price){
// const amountStr = price+'';
// const nums = amountStr.split('.');
// if(nums.length ===2 && nums[1].length>4){
// return nums[0]+"."+nums[1].slice(0,4);
// }
return price;
}
getCurrencyMaxReturnAmount(currency){
const balance = this.getCurrencyBalance(currency);
return Math.min(Strategy3MaxAmountMap[currency],balance);
}
getDepthPrintStr(fromCurrency,toCurrency,depth=1){
return super.getDepthPrintStr(fromCurrency,toCurrency,3);
}
_allowPublish(asks,bids,symbol){
const balance = this.getCurrencyBalance("BIX");
if(balance < 80){
console.error("目前BIX数量少于80,暂停交易");
return false;
}
return super._allowPublish(asks,bids,symbol);
}
}
module.exports = BiboxCollector;
const Strategy3 = require('./strategy3');
const machine = process.env['MACHINE'];
const Order = require('./order');
const constants = require('./constants');
const STRATEGY3_RETURN_MIN_AMOUNT_MAP = {
USDT: 20,
BTC: 0.002,
ETH: 0.03,
BIX: 15,
};
class BiboxStrategy3 extends Strategy3 {
constructor(collector) {
super(collector, machine);
this.orderService = new Order();
}
_doTrade(baseCurrency1, midCurrency, baseCurrency2, buyPrice, sellPrice, returnPrice, amount, returnAmount, doSaveOrder) {
const buySymbol = this.collector.getSymbol(baseCurrency1, midCurrency);
const buyStartTime = Date.now();
const collector = this.collector;
const orderService = this.orderService;
const sellSymbol = this.collector.getSymbol(midCurrency, baseCurrency2);
const returnSymbol = this.collector.getSymbol(baseCurrency2, baseCurrency1);
let createdSellOrder = false;
let createdBuyOrder = false;
let retryTime = 8;
const totalRetryTime = 8;
function sellOrder() {
console.log("sell@" + sellSymbol + " amount:" + amount + " price:" + sellPrice);
const sellStartTime = Date.now();
orderService.order(
sellSymbol, sellPrice, amount, constants.OrderSideSell
, (error, order) => {
if (error) {
if(error.code === "2027"){
if(retryTime>0 && !createdSellOrder){
console.log("提示余额不足,再次尝试");
retryTime -- ;
setTimeout(()=>{
sellOrder();
},(totalRetryTime - retryTime)*80);
}else if(!createdSellOrder && createdBuyOrder){
console.log("已没有重试次数,但是买单已创建成功,继续重试");
setTimeout(()=>{
sellOrder();
},500);
}
}
console.error("sell error:" + JSON.stringify(error) + ";sell price:" + sellPrice + " @amount:" + amount);
return;
}
createdSellOrder = true;
doSaveOrder(order, midCurrency, baseCurrency2, constants.OrderTypeSell);
console.log("sell start@" + sellStartTime + " end@" + Date.now() + " symbol:" + sellSymbol);
})
}
function returnOrder() {
console.log("return@" + returnSymbol + " amount:" + returnAmount + " price:" + returnPrice);
const returnStartTime = Date.now();
orderService.order(returnSymbol, returnPrice, returnAmount, collector.getTradeSide(baseCurrency2, baseCurrency1)
, (error, order) => {
if (error) {
console.error("return error:");
console.error(error);
return;
}
console.log("return start@" + returnStartTime + " end@" + Date.now() + " symbol:" + returnSymbol);
doSaveOrder(order, baseCurrency2, baseCurrency1, constants.OrderTypeReturn);
}, true);
}
console.log("buy@" + buySymbol + " amount:" + amount + " price:" + buyPrice);
orderService.FOKLikeOrder(buySymbol, buyPrice, amount, constants.OrderSideBuy
, (error, order) => {
if (error) {
console.error("buy error:");
console.error(error);
return;
}
console.log("buy start@" + buyStartTime + " end@" + Date.now() + " symbol:" + buySymbol);
const delay = collector.isBaseCurrency(midCurrency)?0:100; //基础币有底仓,不需要考虑缓存问题
if (order.executedQty === order.origQty) {
console.log("买入全部成交");
createdBuyOrder = true;
setTimeout(()=>{
sellOrder();
},delay);
returnOrder();
} else if (parseFloat(order.executedQty) > 0) {
returnAmount = parseFloat(returnAmount) * parseFloat(order.executedQty) / parseFloat(amount);
returnAmount = collector.processAmount(baseCurrency2, baseCurrency1, returnAmount);
amount = collector.processAmount(midCurrency, baseCurrency2, parseFloat(order.executedQty));
console.log(`买入部分成交${order.executedQty} 回归量调整为${returnAmount}`);
order.remark = "部分成交@amount " + order.executedQty + ";";
createdBuyOrder = true;
if (parseFloat(amount) > 0)
setTimeout(()=>{
sellOrder();
},delay);
if (parseFloat(returnAmount) > 0)
returnOrder();
else {
console.log("回归数量太小,不执行回归");
}
} else {
console.warn("买入失败");
retryTime = 0;
}
doSaveOrder(order, baseCurrency1, midCurrency, constants.OrderTypeBuy);
})
// if(!this.collector.isBaseCurrency(midCurrency) && this.collector.getCurrencyBalance(midCurrency,true) < amount){
// for(let i=0;i<2;i++){
// setTimeout(()=>{
// sellOrder();
// },i*200+100);
// }
// }
}
_getTrades(order, callback) {
callback([]);
}
_getMinReturnAmount(currency) {
return STRATEGY3_RETURN_MIN_AMOUNT_MAP[currency] || 1;
}
_getMinTradeInterval(){
return 8000;
}
_needConsiderDepthCount(){
return [[2,1],[3,2,1],[2,1]];
}
_logDelay(){
return true;
}
_getMinMargin(){
return 0.05;
}
_giveUpOrder(baseCurrency1, midCurrency, baseCurrency2,totalMarginRate,buyDepth,sellDepth){
const sellSymbol = this.collector.getSymbol(midCurrency, baseCurrency2);
const sellDelay = Date.now() - this.collector.getSymbolEventTime(sellSymbol);
if(sellDelay > 25000){
console.log("卖出延时过大,放弃此单:"+sellDelay);
return true;
}
return super._giveUpOrder(baseCurrency1,midCurrency,baseCurrency2,totalMarginRate,buyDepth,sellDepth);
}
}
const BiboxCollector = require('./biboxCollector');
const collector = new BiboxCollector(null);
collector.runStrategy3();
const strategy3 = new BiboxStrategy3(collector);
strategy3.run();
......@@ -3,9 +3,10 @@ const baseCurrencies = ["ETH","BTC","USDT", "NEO", "KCS"];
const restCurrencies = ["PRA","AAC","AT","RED","BU","QTUM","EOSDAC","HPB","ORME","TRX","TTC", "USDT","BTC","ETH", "NEO", "KCS", ];
const machine = process.env['MACHINE'];
const biboxApi = require('./api_kucoin');
const zlib = require('zlib');
const IPReader = require('./util');
let coinInfoMap = null
const IPs = IPReader.allIps;
let coinInfoMap = null;
const totalOrderbook = {};
const Strategy3MaxAmountMap = {
USDT: 300,
BTC: 0.05,
......@@ -14,6 +15,68 @@ const Strategy3MaxAmountMap = {
KCS: 600,
}
function mergeDepthAsk(oldDepth, updateDepth) {
const results = [];
const cal = {old: 0, updated: 0};
for (; cal.old < oldDepth.length || cal.updated < updateDepth.length;) {
// if(results.length === length){
// break;
// }
if (oldDepth[cal.old] === undefined) {
updateDepth[cal.updated][1] > 1e-8 && results.push(updateDepth[cal.updated]);
cal.updated++;
} else if (updateDepth[cal.updated] === undefined) {
results.push(oldDepth[cal.old]);
cal.old++;
} else if (parseFloat(oldDepth[cal.old][0]) > parseFloat(updateDepth[cal.updated][0])) {
updateDepth[cal.updated][1] > 1e-8 && results.push(updateDepth[cal.updated]);
cal.updated++;
} else if (parseFloat(oldDepth[cal.old][0]) === parseFloat(updateDepth[cal.updated][0])) {
if (parseFloat(updateDepth[cal.updated][1]) > 1e-8) {
results.push(updateDepth[cal.updated]);
}
cal.updated++;
cal.old++;
} else {
results.push(oldDepth[cal.old]);
cal.old++;
}
}
return results
}
function mergeDepthBid(oldDepth, updateDepth) {
const result = [];
const cal = {old: 0, updated: 0};
for (; cal.old < oldDepth.length || cal.updated < updateDepth.length;) {
// if(result.length === length){
// break;
// }
if (oldDepth[cal.old] === undefined) {
updateDepth[cal.updated][1] > 1e-8 && result.push(updateDepth[cal.updated]);
cal.updated++;
} else if (updateDepth[cal.updated] === undefined) {
result.push(oldDepth[cal.old]);
cal.old++;
} else if (parseFloat(oldDepth[cal.old][0]) < parseFloat(updateDepth[cal.updated][0])) {
updateDepth[cal.updated][1] > 1e-8 && result.push(updateDepth[cal.updated]);
cal.updated++;
} else if (parseFloat(oldDepth[cal.old][0]) === parseFloat(updateDepth[cal.updated][0])) {
if (parseFloat(updateDepth[cal.updated][1]) > 1e-8) {
result.push(updateDepth[cal.updated++]);
cal.old++;
} else {
cal.updated++;
cal.old++;
}
} else {
result.push(oldDepth[cal.old++])
}
}
return result;
}
class BiboxCollector extends BaseCollector{
constructor(wantedSymbols){
super("KUCOIN",baseCurrencies,machine,wantedSymbols);
......@@ -43,34 +106,70 @@ class BiboxCollector extends BaseCollector{
return 10 *1000;
}
_requestSymbolFullOrderbook(index,symbols,depth,baseCollectorCallback){
if(index < symbols.length){
this._doSubscribeSymbols(symbols,baseCollectorCallback,depth);
}else{
const symbol = symbols[index];
const ipAddress = IPs[index%IPs.length];
this.api.getOrderbook(symbol,depth,ipAddress,(error,result)=>{
if(error){
console.error("get depth by rest error:");
console.error(error);
this._requestSymbolFullOrderbook(index,symbols,depth,baseCollectorCallback);
return;
}
const data = result.data;
const timeStamp = data.timestamp;
baseCollectorCallback(data.SELL.slice(0,depth).map((item)=>[item[0],item[1]]),data.BUY.slice(0,depth).map((item)=>[item[0],item[1]]),symbol,timeStamp);
totalOrderbook[symbol]={asks:data.SELL,bids:data.BUY};
this._requestSymbolFullOrderbook(index+1,symbols,depth,baseCollectorCallback);
});
}
}
_doSubscribeSymbols(symbols,callback,subscribeDepth){
}
_subscribeSymbols(symbols,callback,subscribeDepth){
this._fetchDepthByRest(symbols,subscribeDepth,callback);
// this.api.subscribeSymbols(symbols,(error,data)=>{
// if(error){
// console.error("subscribe error");
// console.error(error);
// }else{
// const symbol = data['pair'];
// // const symbol = result.pair.replace("_","");
// const timeStamp = data['TS'];
// const depth = data['depth'];
// const asks = depth.asks.slice(0,subscribeDepth).map((item)=>[item[0],item[1]]);
// const bids = depth.bids.slice(0,subscribeDepth).map((item)=>[item[0],item[1]]);
// callback(asks, bids, symbol, timeStamp);
// }
// })
// this._fetchDepthByRest(symbols,subscribeDepth,callback);
this.api.subscribeSymbolsAndTicker(symbols,(error,result)=>{
if(error){
console.log(error);
}else{
//{"data":{"volume":0.03502337,"price":0.03367632,"count":1.04,"action":"ADD","time":1538538428679,"type":"BUY"},"topic":"/trade/ETH-BTC_TRADE","type":"message","seq":32748778883081}
const timeStamp = result.data.time;
const symbol = result.data.topic.replace('/trade/','').replace('_TRADE','');
const updateData = [[result.data.price,result.data.count]];
let asks = totalOrderbook[symbol].asks|| [];
let bids = totalOrderbook[symbol].bids|| [];
if(result.data.type === 'BUY'){
asks = mergeDepthAsk(asks,updateData);
}else if(result.data.type === 'SELL'){
bids = mergeDepthBid(bids,updateData);
}
if(symbol === 'NEO-BTC'){
console.log("") //todo 这里输出日志,与网页对比
}
callback(asks.slice(0,subscribeDepth), bids.slice(0,subscribeDepth), symbol, timeStamp);
}
})
}
_fetchDepthByRest(symbols,depth,callback){
const restSymbols = symbols;
// for(let symbol of symbols){
// const midCurrency = symbol.split("-")[0];
// if(restCurrencies.includes(midCurrency)){
// restSymbols.push(symbol);
// }
// }
for(let symbol of symbols){
const midCurrency = symbol.split("-")[0];
if(restCurrencies.includes(midCurrency)){
restSymbols.push(symbol);
}
}
const perInterval = 8;
const totalInterval = perInterval * restSymbols.length;
const IPs = IPReader.allIps;
// const IPs = IPReader.allIps;
setInterval(()=>{
const sortedSymbols = restSymbols.sort(()=>{
return Math.random()>0.5
......@@ -87,7 +186,8 @@ class BiboxCollector extends BaseCollector{
}
const data = result.data;
const timeStamp = data.timestamp;
callback(data.SELL.map((item)=>[item[0],item[1]]),data.BUY.map((item)=>[item[0],item[1]]),symbol,timeStamp);
callback(data.SELL.slice(0,depth).map((item)=>[item[0],item[1]]),data.BUY.slice(0,depth).map((item)=>[item[0],item[1]]),symbol,timeStamp);
totalOrderbook[symbol]={asks:data.SELL,bids:data.BUY};
});
},i*perInterval);
}
......
const constants = require('./constants');
const biboxApi = require('./api');
const StateFilled = 3;
const StateNew = 1;
const StatePartiallyFilled = 2;
const StatePartiallyCancelled = 4;
const StateCancelled = 5;
const StateCancelling = 6;
function returnFakeOrder(symbol, price, amount) {
return {
orderId: "testOrder",
price: price,
status: constants.OrderStatusCanceled,
transactTime: Date.now(),
origQty: amount,
executedQty: 0,
symbol: symbol,
}
}
function convertToRecordOrder(order) {
let status;
switch (order.status) {
case StateFilled:
case StatePartiallyCancelled:
status = constants.OrderStatusFilled;
break;
case StateCancelled:
status = constants.OrderStatusCanceled;
break;
case StateNew:
status = constants.OrderStatusNew;
break;
case StatePartiallyFilled:
status = constants.OrderStatusPartiallyFilled;
break;
case StateCancelling:
status = constants.OrderStatusPendingCancel;
break;
}
return {
orderId: order.id + '',
price: order.price,
status: status,
transactTime: order.createdAt,
origQty: order.amount,
executedQty: order.deal_amount,
symbol: order.pair,
type: order.order_side,
}
}
function isTimeOutError(error){
return error.code === 'ESOCKETTIMEDOUT' || error.code === 'ETIMEDOUT';
}
class Order {
constructor() {
this.api = new biboxApi();
}
_handleFOKSearchResult(orderId, finalCallback, error, result) {
if (error) {
console.error("搜索订单出错");
console.error(error);
let timeout = 100;
setTimeout(() => {
this.api.searchOrder(orderId, this._handleFOKSearchResult.bind(this, orderId, finalCallback));
}, timeout);
return;
}
const order = result.result;
if(!order.amount && !order.deal_amount){
console.error("没有返回amount 和deal amount字段,继续搜索");
console.log(order);
this.api.searchOrder(orderId, this._handleFOKSearchResult.bind(this, orderId, finalCallback));
}else if (order.status === StateFilled || order.status === StateCancelled || order.status === StatePartiallyCancelled) {
finalCallback(null, convertToRecordOrder(order));
} else if (order.status === StateNew || order.status === StatePartiallyFilled) {
this.api.cancelOrder(orderId, (error, result) => {
// setTimeout(() => {
console.log("尝试取消,准备搜索订单");
this.api.searchOrder(orderId, this._handleFOKSearchResult.bind(this, orderId, finalCallback));
// }, 80)
})
} else {
setTimeout(() => {
console.log("订单尚未完成或取消,再次搜索");
this.api.searchOrder(orderId, this._handleFOKSearchResult.bind(this, orderId, finalCallback));
}, 500);
}
}
FOKLikeOrder(symbol, price, amount, side, callback) {
if (!constants.RealOrder) {
callback(null, returnFakeOrder(symbol, price, amount));
return;
}
this.api.order(price, amount, symbol, side, (error, result) => {
if (error) {
callback(error, null);
return;
}
const orderId = result.result;
setTimeout(()=>{
this.api.searchOrder(orderId, this._handleFOKSearchResult.bind(this, orderId, callback));
},50);
})
}
order(symbol, price, amount, side, callback,mustSuccess = false) {
if (!constants.RealOrder) {
callback(null, returnFakeOrder(symbol, price, amount));
return;
}
this.api.order(price, amount, symbol, side, (error, result) => {
if (error) {
callback(error, null);
return;
}
const orderId = result.result;
const api2 = this.api;
function handleSearchResult(error, result) {
if (error) {
console.error("搜索订单出错");
console.error(error);
let timeout = 100;
if(error.code == '2091' || error.code == '4003'){
console.log("2秒后再继续搜索");
timeout = 2000;
}
setTimeout(() => {
this.api.searchOrder(orderId, this.handleSearchResult.bind(this, orderId, callback));
}, timeout);
}
if(!result.result.amount){
console.error("没有返回amount,再次搜索");
api2.searchOrder(orderId, handleSearchResult.bind(this));
return;
}
callback(null, convertToRecordOrder(result.result));
}
this.api.searchOrder(orderId, handleSearchResult.bind(this));
})
}
}
module.exports = Order;
\ No newline at end of file
......@@ -2,40 +2,9 @@ const WebSocket = require('ws');
const SocksProxyAgent = require('socks-proxy-agent');
const proxy = process.env.agent;
const agent = proxy ? new SocksProxyAgent(proxy) : null;
const zlib = require('zlib');
const BiboxCollector = require('./biboxCollector');
const biboxApi = require('./api');
const constants = require('./constants');
const Order = require('./order');
function testWs() {
this.ws = new WebSocket('wss://push.bibox.com/', {agent});
const wss = this.ws;
this.ws.on('open', () => {
console.log("成功启动collector");
wss.send(JSON.stringify({"event":"addChannel","channel":"bibox_sub_spot_BTC_USDT_depth","binary":0}));
});
this.ws.on('message', (data)=>{
// console.log(data);
const response = JSON.parse(data)[0];
const decodeStr = Buffer.from(response.data,'base64');
zlib.unzip(decodeStr,(result1,result2)=>{
console.log(result1);
console.log(result2.toString());
})
});
this.ws.on('error', (error) => {
console.log("collector websocket error:");
console.log(error);
})
this.ws.on('close', () => {
console.log("websocket closed");
})
}
// testWs();
function testCollector(){
const collector = new BiboxCollector(null);
collector.runStrategy3();
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment