summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/dht.c127
-rw-r--r--src/main.c47
2 files changed, 125 insertions, 49 deletions
diff --git a/src/dht.c b/src/dht.c
index 137c5d8..c5fd98b 100644
--- a/src/dht.c
+++ b/src/dht.c
@@ -84,6 +84,8 @@ struct node {
time_t last_received; /**< time when I received the last packet from it */
time_t last_sent; /**< time when I sent the last query to it. not incremented if it has unanswered queries. */
struct node * next;
+#define SERVER_ERRORS_BAD 15
+ unsigned server_errors; /**< number of server errors, set node grade to bad if more than SERVER_ERRORS_BAD. reset when replied() */
};
/**
@@ -147,6 +149,8 @@ char * node_grade_str (enum node_grade g) {
#define QUESTIONABLE_AFTER (15*60)
enum node_grade node_grade (const struct node * n) {
+ if (n->server_errors > SERVER_ERRORS_BAD)
+ return bad;
if (n->last_received + QUESTIONABLE_AFTER < seconds()) {
if (n->last_sent + 60 < seconds() && n->unanswered > 1)
return bad;
@@ -236,7 +240,8 @@ enum flags {
goodmeta = 1 << 3, /**< peer gave us good metadata that is currently stored in the torrent */
badmeta = 1 << 4, /**< peer gave us bogus metadata that does not match it's hash */
unreachable = 1 << 5, /**< pear unreachable - timed out */
- requested = 1 << 6 /**< because choking is unimplemented, packets are sent like with TFTP, a request is sent only when a reply is received */
+ requested = 1 << 6, /**< because choking is unimplemented, packets are sent like with TFTP, a request is sent only when a reply is received */
+ protocolerror = 1 << 7 /**< I don't understand you my g */
};
/**
@@ -278,7 +283,7 @@ void peer_print (FILE * s, const struct peer * p) {
char remote[INET6_ADDRSTRLEN + 64];
if (!inet_ntop(p->addr.sin6_family, p->addr.sin6_addr.s6_addr, remote, INET6_ADDRSTRLEN+7))
snprintf(remote, sizeof remote, "(inet_ntop: %s)", strerror(errno));
- fprintf(s, "%s/%d %s%s%s%s%s%s", remote, ntohs(p->addr.sin6_port), p->flags & nometasupport ? " nometasupport" : "", p->flags & nometa ? " nometa" : "", p->flags & goodmeta ? " goodmeta" : "", p->flags & badmeta ? " badmeta" : "", p->flags & unreachable ? " unreachable" : "", p->flags & requested ? " requested" : "");
+ fprintf(s, "%s/%d %s%s%s%s%s%s%s", remote, ntohs(p->addr.sin6_port), p->flags & nometasupport ? " nometasupport" : "", p->flags & nometa ? " nometa" : "", p->flags & goodmeta ? " goodmeta" : "", p->flags & badmeta ? " badmeta" : "", p->flags & unreachable ? " unreachable" : "", p->flags & requested ? " requested" : "", p->flags & protocolerror ? " protocolerror" : "");
}
/**
@@ -337,6 +342,7 @@ struct torrent {
unsigned char * packet; /**< packet being constructed from tcp for info torrents, 32727 bytes */
int recvd; /**< length of received data for current packet */
char * software; /**< can be read from disconnection() - software string client sent, may be NULL */
+ time_t ttl; /**< if nonzero, torrent will get his ->type cleared after this seconds() timestamp. set to seconds()+512 for example */
};
/**
@@ -522,6 +528,10 @@ struct dht {
#define PINGS_CAP 256 /**< capacity of circular buffer, one element is ~28 bytes, so this is 7168 B */
struct sockaddr_in6 pings[PINGS_CAP]; /**< circular buffer of recent pings */
unsigned periods; /**< number of times periodic() was called */
+ unsigned rxqp;
+ unsigned txqp;
+ unsigned rxrp;
+ unsigned txrp;
};
/**
@@ -601,6 +611,13 @@ void possible_torrent (struct dht * d __attribute__((unused)), const unsigned ch
*/
void sendb (struct dht * d, struct bencoding * b, const struct sockaddr_in6 * a) {
+ struct bencoding * y = bpath(b, "y");
+ if (y && y->type & string && y->valuelen >= 1) {
+ if (y->value[0] == 'r')
+ d->txrp++;
+ else
+ d->txqp++;
+ }
char remote[INET6_ADDRSTRLEN + 64];
if (!inet_ntop(a->sin6_family, &a->sin6_addr, remote, INET6_ADDRSTRLEN+7))
snprintf(remote, sizeof remote, "(inet_ntop: %s)", strerror(errno));
@@ -1129,7 +1146,7 @@ unsigned int distance (const unsigned char * a, const unsigned char * b) {
}
/**
- * returns 1 if bucket is perfect, meaning it is fresh, has K nodes, and all nodes are good. bucket that contains id is almost never perfect, as it can usually be split into smaller buckets, that's why param d is required to get own id
+ * returns 1 if bucket is perfect, meaning it is fresh, has K nodes, and no node is bad. bucket that contains id is almost never perfect, as it can usually be split into smaller buckets, that's why param d is required to get own id
*
* if d is NULL, it's not checked whether we fall into the bucket and whether it could be split
*
@@ -1137,9 +1154,9 @@ unsigned int distance (const unsigned char * a, const unsigned char * b) {
* @param b [in] the bucket
*/
-int bucket_good (const struct dht * d, const struct bucket * b) {
+int bucket_grade (const struct dht * d, const struct bucket * b) {
if (d) {
- if (!bucket_good(NULL, b))
+ if (!bucket_grade(NULL, b))
return 0;
if (in_bucket(d->id, b)) {
struct node * n = b->nodes;
@@ -1156,7 +1173,7 @@ int bucket_good (const struct dht * d, const struct bucket * b) {
return 0;
struct node * n = b->nodes;
if (n) {
- if (node_grade(n) != good)
+ if (node_grade(n) == bad)
return 0;
n = n->next;
}
@@ -1179,8 +1196,6 @@ int bucket_good (const struct dht * d, const struct bucket * b) {
*/
void replied (const struct dht * d, const unsigned char * id, const struct sockaddr_in6 * addr) {
- if (!memcmp(d->id, id, 20)) // WE COULDN'T'VE POSSIBLY REPLIED TO OURSELVES!
- return;
struct bucket * b = d->buckets;
if (family(addr->sin6_addr.s6_addr) == AF_INET6)
b = d->buckets6;
@@ -1188,10 +1203,12 @@ void replied (const struct dht * d, const unsigned char * id, const struct socka
struct node * found = find(id, &b, &n);
if (found) {
found->last_received = seconds();
- found->unanswered = 0;
+ found->server_errors = found->unanswered = 0;
return;
}
- if (bucket_good(d, b))
+ if (!memcmp(d->id, id, 15)) // WE COULDN'T'VE POSSIBLY REPLIED TO OURSELVES - or sybil attack
+ return;
+ if (bucket_grade(d, b))
return;
struct node * node = node_init();
memcpy(&node->addr, addr, sizeof *addr);
@@ -1252,14 +1269,14 @@ void replied (const struct dht * d, const unsigned char * id, const struct socka
void potential_node (struct dht * d, const struct sockaddr_in6 * a, const unsigned char * id) {
if (!a->sin6_port)
return; // sorry, I can't send to port 0. this is a mistake or a malicious node
- if (!memcmp(d->id, id, 20)) // we are not a potential node of ourselves
+ if (!memcmp(d->id, id, 15)) // we are not a potential node of ourselves, if 15 bytes are same : sybil
return;
struct bucket * bucket = d->buckets;
if (family(a->sin6_addr.s6_addr) == AF_INET6)
bucket = d->buckets6;
if (find(id, &bucket, NULL))
return;
- if (!bucket_good(d, bucket))
+ if (!bucket_grade(d, bucket))
ping_node(d, a);
}
@@ -1323,7 +1340,7 @@ void remove_torrent (struct dht * d, struct torrent * t) {
void oom (struct dht * d) {
struct torrent * drop = d->last_torrent;
- while (drop && drop->type)
+ while (drop && (drop->type || drop->dl))
drop = drop->prev;
remove_torrent(d, drop);
}
@@ -1345,6 +1362,8 @@ struct torrent * add_torrent (struct dht * d, struct torrent * t) {
torrent_free(t);
return found;
}
+ if (d->torrents_num >= d->torrents_max)
+ oom(d);
if (d->torrents)
d->torrents->prev = t;
else
@@ -1359,8 +1378,6 @@ struct torrent * add_torrent (struct dht * d, struct torrent * t) {
else
memcpy(d->sample+20*(rand() % 3000), t->hash, 20);
#endif
- if (d->torrents_num >= d->torrents_max)
- oom(d);
return t;
}
@@ -1385,12 +1402,14 @@ struct peer * add_peer (struct dht * d, struct torrent * t, struct peer * p) {
unsigned l = 0;
while (*peer) {
l++;
- if (!memcmp(&(*peer)->addr, &p->addr, sizeof p->addr)) {
+ if (!memcmp((*peer)->addr.sin6_addr.s6_addr, p->addr.sin6_addr.s6_addr, 16)) { // ignore multiple peers on same port
peer_free(p);
return *peer;
}
if (*peer != t->dl)
nondl = peer;
+ else // dls are holy
+ goto c;
if ((*peer)->flags & badmeta)
bad = peer;
if ((*peer)->flags & nometasupport && !(bad && (*bad) && (*bad)->flags & badmeta))
@@ -1404,6 +1423,7 @@ struct peer * add_peer (struct dht * d, struct torrent * t, struct peer * p) {
*peer = next;
continue;
}
+ c:
peer = &(*peer)->next;
}
if (bad && l > d->peers_per_torrent_max) {
@@ -1592,9 +1612,10 @@ void compact (struct dht * d, const char * value, int len, struct torrent * t) {
memcpy(addr.sin6_addr.s6_addr+(len == 4+2+20 ? 12 : 0), value + 20, len == 4+2+20 ? 4 : 16);
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wpointer-sign"
- potential_node(d, &addr, value); // NOTE02 at the beginning, a lot of packets will be sent, since every reply of potential_node will generate K replies. naively this would generate an exponentially increasing number of packets, in increasing powers of 8 (8**n). to prevent an absolute resource hog, this is only done when node would be useful and would contribute to the routing table
if (t)
potential_torrent_node(d, t, &addr, value);
+ else
+ potential_node(d, &addr, value); // NOTE02 at the beginning, a lot of packets will be sent, since every reply of potential_node will generate K replies. naively this would generate an exponentially increasing number of packets, in increasing powers of 8 (8**n). to prevent an absolute resource hog, this is only done when node would be useful and would contribute to the routing table
#pragma GCC diagnostic pop
}
@@ -1736,6 +1757,12 @@ void handle (struct dht * d, char * pkt, int len, struct sockaddr_in6 addr) {
L(incoming_dht, d, "handle(%s): %s", remote, out);
}
struct bencoding * y = bpath(b, "y");
+ if (y && y->type & string && y->valuelen >= 1) {
+ if (y->value[0] == 'r')
+ d->rxrp++;
+ else
+ d->rxqp++;
+ }
char * msg_type = "";
if (y && y->type & string)
msg_type = y->value;
@@ -1864,18 +1891,16 @@ void handle (struct dht * d, char * pkt, int len, struct sockaddr_in6 addr) {
struct torrent * torrent = find_torrent(d, hash->value, 20);
d->possible_torrent(d, hash->value, torrent);
#pragma GCC diagnostic pop
- unsigned i = 0;
+ unsigned i = 8;
if (torrent) {
struct peer * peer = torrent->peers;
struct bencoding * values = calloc(1, sizeof *values);
values->type = list;
- while (peer) { // TODO implement peer preference: prefer sending peers that responded to us
+ while (i-- && peer) { // TODO implement peer preference: prefer sending peers that responded to us
if (family(peer->addr.sin6_addr.s6_addr) != family(addr.sin6_addr.s6_addr)) // possible
goto c;
- if (peer->flags & unreachable)
+ if (peer->flags & (unreachable || protocolerror))
goto c;
- if (i++ > K)
- break;
struct bencoding * value = calloc(1, sizeof *value);
memcpy((value->value = malloc((value->valuelen = ADDRLEN(family(peer->addr.sin6_addr.s6_addr))+2))), peer->addr.sin6_addr.s6_addr, ADDRLEN(family(peer->addr.sin6_addr.s6_addr)));
memcpy(value->value+ADDRLEN(family(peer->addr.sin6_addr.s6_addr)), &peer->addr.sin6_port, 2);
@@ -1988,10 +2013,10 @@ void handle (struct dht * d, char * pkt, int len, struct sockaddr_in6 addr) {
struct bencoding * nodes = bpath(b, "r/nodes");
struct bencoding * nodes6 = bpath(b, "r/nodes6");
if (nodes && nodes->type & string && !(nodes->valuelen % 26))
- for (unsigned i = 0; i < MIN(nodes->valuelen, K); i += 26)
+ for (unsigned i = 0; i < MIN(nodes->valuelen, K*26); i += 26)
compact(d, nodes->value+i, 26, torrent);
if (nodes6 && nodes6->type & string && !(nodes6->valuelen % 38))
- for (unsigned i = 0; i < MIN(nodes6->valuelen, K); i += 38)
+ for (unsigned i = 0; i < MIN(nodes6->valuelen, K*38); i += 38)
compact(d, nodes6->value+i, 38, torrent);
break;
case 'E':
@@ -2148,8 +2173,8 @@ int refresh (struct dht * d, int fam) {
node_free(old);
continue;
case questionable:
- if (!(rand() % (QUESTIONABLE_AFTER/PERIODIC)))
- ping_node(d, &(*n)->addr); // NOTE03 about not pinging questionable nodes: this ensures a constant regeneration of the routing table. this is just an idea, if the client frequently gets in a situation without any nodes in the routing table, remove the comment before ping_node call.
+ // if (!(rand() % ())) // I disabled it again because it was too spammy
+ // ping_node(d, &(*n)->addr); // NOTE03 about not pinging questionable nodes: this ensures a constant regeneration of the routing table. this is just an idea, if the client frequently gets in a situation without any nodes in the routing table, remove the comment before ping_node call.
break; // update on why I uncommented: to mitigate sybil attack, it's baje important to prefer old nodes
case good:
nrgood++;
@@ -2159,7 +2184,8 @@ int refresh (struct dht * d, int fam) {
}
b = b->next;
}
- if (buckets > 32) { // sybil attack - node is broken - clear whole routing table, keeping one bucket
+ if (buckets > 64) { // sybil attack - node is broken - clear whole routing table, keeping one bucket
+ dht_print(d->log, d);
L(disagreement, d, "@@@@@@ SYBIL ATTACK - CLEARING ROUTING TABLE @@@@@@");
int keep_first = rand() % 2; // should we even keep one bucket? the sybil node has a 1/2
if (keep_first) { // chance of having stared in the bucket farthest away, so it's stored there ...
@@ -2215,7 +2241,6 @@ int refresh (struct dht * d, int fam) {
void periodic (struct dht * d) {
d->periods++;
- L(debug, d, "called");
int dns = 0;
if (!refresh(d, AF_INET))
dns++;
@@ -2277,6 +2302,8 @@ void periodic (struct dht * d) {
;
struct torrent * t = d->torrents;
while (t) {
+ if (t->ttl && seconds() > t->ttl)
+ t->type = 0;
if (t->type & (peers | announce)) {
/*
struct node * n = t->nodes;
@@ -2388,7 +2415,7 @@ void periodic (struct dht * d) {
struct peer * p = t->peers;
int c = 0;
while (p) {
- if (!(p->flags & (badmeta | nometasupport | unreachable)))
+ if (!(p->flags & (badmeta | nometasupport | unreachable | protocolerror)))
c++;
p = p->next;
}
@@ -2397,7 +2424,7 @@ void periodic (struct dht * d) {
int s = rand() % c; // OB1 untested
p = t->peers;
while (p) {
- if (!(p->flags & (badmeta | nometasupport | unreachable)) && !s--) {
+ if (!(p->flags & (badmeta | nometasupport | unreachable | protocolerror)) && !s--) {
t->dl = p;
t->state = 0;
t->socket = socket(AF_INET6, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0);
@@ -2435,7 +2462,10 @@ void periodic (struct dht * d) {
a:
t = t->next;
}
- return;
+ L(debug, d, "txqp=%u rxrp=%u rxqp=%u txrp=%u", d->txqp, d->rxrp, d->rxqp, d->txrp);
+ if (d->txqp > 16384 || d->rxrp > 16384 || d->rxqp > 16384 || d->txrp > 16384)
+ raise(SIGINT);
+ d->txqp = d->txrp = d->rxqp = d->rxrp = 0;
}
/**
@@ -2506,9 +2536,10 @@ void tcp_work (struct dht * d) {
}
if (!(t->state & ~(handshake_sent | incoming | outgoing))) {
int ret = recv(t->socket, packet, 1+19+8+20*2, MSG_DONTWAIT);
+ L(expected, d, "handshake recv returned value %d, t->recvd == %d", ret, t->recvd);
if (ret == 0) {
L(disagreement, d, "received 0 bytes instead of handshake. EOF");
- t->dl->flags |= nometasupport;
+ t->dl->flags |= protocolerror;
disconnect(t);
goto c;
}
@@ -2522,7 +2553,7 @@ void tcp_work (struct dht * d) {
}
if (ret < 1+19+8+20*2) { // c'mon, this could've arrived in one packet HACK UGLY!
L(disagreement, d, "expected handshake, received only %d bytes", ret);
- t->dl->flags |= nometasupport; // cause it sent a nonsensical packet
+ t->dl->flags |= protocolerror; // cause it sent a nonsensical packet
disconnect(t);
goto c;
}
@@ -2554,31 +2585,44 @@ void tcp_work (struct dht * d) {
disconnect(t);
}
goto c;
+ } else if (!ret){
+ L(disagreement, d, "peer EOF");
+ t->dl->flags |= protocolerror;
+ disconnect(t);
+ goto c;
} else {
- uint32_t l = ntohl(*((uint32_t *) t->packet));
- L(debug, d, "found length of a packet to be %u", l);
d->tr += ret;
t->recvd += ret;
}
+ if (t->recvd == 4) {
+ uint32_t l = ntohl(*((uint32_t *) t->packet));
+ L(debug, d, "found length of a packet to be %u", l);
+ }
}
if (t->recvd >= 4) {
+ char buf[41];
+ buf[40] = '\0';
+ bin2hex(buf, t->packet, MIN(20, t->recvd));
if (t->packet[0]) {
- char buf[41];
- buf[40] = '\0';
- bin2hex(buf, t->packet, 20);
- L(disagreement, d, "peer wants to send too big of a packet %s", buf);
- t->dl->flags |= nometasupport; // too big pkt, sorry
+ L(disagreement, d, "peer wants to send too big of a packet %.*s", MIN(20, t->recvd)*2, buf);
+ t->dl->flags |= protocolerror; // too big pkt, sorry
disconnect(t);
goto c;
}
uint32_t l = ntohl(*((uint32_t *) t->packet));
- int ret = recv(t->socket, t->packet+t->recvd, MIN(l, 32727-t->recvd), MSG_DONTWAIT);
+ int ret = recv(t->socket, t->packet+t->recvd, MIN(l-t->recvd+4, 32727-t->recvd), MSG_DONTWAIT);
+ L(debug, d, "reading packet content: read %d bytes", ret);
if (ret < 0) {
if (errno != EAGAIN) {
L(std_fail, d, "recv(TCP): %s (%d)", strerror(errno), errno);
disconnect(t);
}
goto c;
+ } else if (!ret) {
+ L(disagreement, d, "peer EOF");
+ t->dl->flags |= protocolerror;
+ disconnect(t);
+ goto c;
} else {
d->tr += ret;
t->recvd += ret;
@@ -2720,6 +2764,7 @@ void tcp_work (struct dht * d) {
}
free_bencoding(e);
end_packet:
+ L(debug, d, "cleared packet recvd to 0");
t->recvd = 0;
}
/*
diff --git a/src/main.c b/src/main.c
index 7527921..4368e21 100644
--- a/src/main.c
+++ b/src/main.c
@@ -31,19 +31,33 @@ void handler (int s) {
break;
}
}
+time_t last_added = 0;
void found_torrent (struct dht * d __attribute__((unused)), const unsigned char * h, struct torrent * t) {
char buf[128];
bin2hex(buf, h, 20);
buf[40] = '\0';
- L(debug, d, "magnet:?xt=urn:btih:%s%s", buf, t ? " stored" : "");
+ struct stat statbuf;
+ strcat(buf, ".torrent");
+ if (!stat(buf, &statbuf)) {
+ L(expected, d, "%s already exists", buf);
+ return;
+ }
+ L(debug, d, "%s%s", buf, t ? " stored" : " new");
if (t) {
- struct stat statbuf;
- strcat(buf, ".torrent");
- if (!stat(buf, &statbuf)) {
- L(expected, d, "%s already exists", buf);
+ if (!t->type)
+ t->ttl = seconds()+512;
+ t->type |= info | peers;
+ } else {
+ if (last_added + 10 > seconds()) {
+ L(debug, d, "not adding a torrent this fast");
return;
- } else
- t->type |= info | peers;
+ }
+ last_added = seconds();
+ t = torrent_init();
+ memcpy(t->hash, h, 20);
+ t->type |= info | peers;
+ t->ttl = seconds()+128;
+ add_torrent(d, t);
}
}
int main (int argc, char ** argv) {
@@ -114,6 +128,8 @@ int main (int argc, char ** argv) {
dht->pollfds_size = &pollfds_size;
dht->nfds = &nfds;
dht->verbosity |= (getenv("TRAVNIK_INCOMING_DHT") ? incoming_dht : 0) | (getenv("TRAVNIK_OUTGOING_DHT") ? outgoing_dht : 0) | expected | debug;
+ dht->torrents_max = K;
+ dht->peers_per_torrent_max = K;
struct torrent * torrent = torrent_init();
memcpy(torrent->hash, "\xdd\x82\x55\xec\xdc\x7c\xa5\x5f\xb0\xbb\xf8\x13\x23\xd8\x70\x62\xdb\x1f\x6d\x1c", 20);
torrent->type = /* (useless, since we have no listening system yet) announce | */ peers | info;
@@ -121,8 +137,22 @@ int main (int argc, char ** argv) {
periodic(dht);
// alarm(PERIODIC);
w:
- while (poll(pollfds, nfds, -1) != -1) // can't timeout
+ while (poll(pollfds, nfds, -1) != -1) { // can't timeout
+ if (sigusr1) {
+ sigusr1 = 0;
+ dht_print(stdout, dht);
+ goto w;
+ }
+ if (periodično) {
+ periodično = 0;
+ // alarm(PERIODIC);
+ periodic(dht);
+ goto w;
+ }
+ if (samomor)
+ goto s;
work(dht);
+ }
switch (errno) {
case EINTR:
if (sigusr1) {
@@ -146,6 +176,7 @@ w:
r = 115;
goto r;
}
+ s:
config = persistent(dht);
dht_free(dht);
dht = NULL;