diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/dht.c | 373 | ||||
-rw-r--r-- | src/main.c | 2 |
2 files changed, 347 insertions, 28 deletions
@@ -162,12 +162,22 @@ void bucket_free (struct bucket * b) { free(b); } +enum flags { + pex = 1 << 0, /**< peer supports BEP-0011 pex */ + nometasupport = 1 << 1, /**< peer does not support BEP-0009 metadata fetch extension */ + nometa = 1 << 2, /**< peer has no metadata downloaded */ + goodmeta = 1 << 3, /**< peer gave us good metadata that is currently stored in the torrent */ + badmeta = 1 << 4, /**< peer gave us bogus metadata that does not match it's hash */ + unreachable = 1 << 5, /**< pear unreachable - timed out */ + requested = 1 << 6 /**< because choking is unimplemented, packets are sent like with TFTP, a request is sent only when a reply is received */ +}; + /** * peer of a torrent */ struct peer { - int socket; /**< tcp socket for bep-0009 */ + enum flags flags; struct sockaddr_in6 addr; /**< peer ip address and port */ struct peer * next; }; @@ -201,7 +211,7 @@ void peer_print (FILE * s, const struct peer * p) { char remote[INET6_ADDRSTRLEN + 64]; if (!inet_ntop(p->addr.sin6_family, p->addr.sin6_addr.s6_addr, remote, INET6_ADDRSTRLEN+7)) snprintf(remote, sizeof remote, "(inet_ntop: %s)", strerror(errno)); - fprintf(s, "%s:%d", remote, ntohs(p->addr.sin6_port)); + fprintf(s, "%s:%d %s%s%s%s%s%s%s", remote, ntohs(p->addr.sin6_port), p->flags & pex ? " pex" : "", p->flags & nometasupport ? " nometasupport" : "", p->flags & nometa ? " nometa" : "", p->flags & goodmeta ? " goodmeta" : "", p->flags & badmeta ? " badmeta" : "", p->flags & unreachable ? " unreachable" : "", p->flags & requested ? " requested" : ""); } /** @@ -209,29 +219,69 @@ void peer_print (FILE * s, const struct peer * p) { */ enum interested { - announce = 1 << 0, /**< will announce myself on every work() call with no packet */ + announce = 1 << 0, /**< will announce myself periodically */ peers = 1 << 1, /**< will get peers on every work() call with no packet */ - info = 1 << 2 /**< download metadata into `dht->dl`/$hash.info */ + info = 1 << 2 /**< download metadata */ +}; + +/** + * state a tcp peer connection + */ + +enum state { + blank = 0, + handshake_sent = 1, + handshake_received = 1 << 1, + extension_sent = 1 << 2, + extension_received = 1 << 3 }; +#define DLTO 60 /**< timeout for torrent downloading - if this amount of seconds has passed and no new metadata piece was received, mark peer as unreach and disconnect */ + /** - * torrent we are interested in + * torrent we are either interested in or are storing metadata for */ struct torrent { + unsigned char ut_metadata; /**< remote's byte for metadata transfer */ + unsigned char ut_pex; /**< remote's byte for pex */ + enum state state; /**< state of tcp connection */ + int socket; /**< tcp socket for bep-0009, enough to be stored in torrent since we only connect to one peer at a time */ + void * userdata; /**< library user may use this to his will in dht->connection() and peer->disconnection() */ + void (* disconnection)(struct torrent *); /**< user provided function that is called before torrent->socket is to be closed and must be removed from the pollfds list. that's a perfect time to store metadata, provided that ->dl->flags & goodmeta */ + struct peer * dl; /**< peer that's currently used for downloading metadata (only one at a time) */ + unsigned time; /**< beginning of metadata download process or time of last received metadata piece */ enum interested type; /**< is truthy only for manually added torrents */ unsigned char hash[20]; /**< infohash */ struct peer * peers; struct node * nodes; /**< closest K DHT nodes to this hash, used only for announce, peers and info torrents */ struct torrent * next; struct torrent * prev; /**< prev is here so that we can easily pop the oldest torrent. dht->last_torrent is useful here */ + int progress; /**< number of pieces of metadata already downloaded */ + int size; /**< number of bytes of metadata for info torrents */ + unsigned char * metadata; /**< metadata being downloaded */ }; /** + * initializes a torrent + * + * @return torrent object allocated on heap, memory ownership/responsibility is transfered to caller + */ + +struct torrent * torrent_init (void) { + struct torrent * t = calloc(1, sizeof *t); + t->socket = -1; + return t; +} + +/** * free a torrent object and it's nodes and peers */ void torrent_free (struct torrent * t) { + t->disconnection(t); + if (t->socket != -1) + close(t->socket); struct node * n = t->nodes; while (n) { struct node * old = n; @@ -244,6 +294,7 @@ void torrent_free (struct torrent * t) { p = p->next; peer_free(old); } + free(t->metadata); free(t); } @@ -256,6 +307,22 @@ int torrent_compare (const void * a, const void * b) { } /** + * kill peer connection and metadata download of torrent + */ + +void disconnect (struct torrent * t) { + t->disconnection(t); + close(t->socket); + t->socket = -1; + t->state = 0; + t->dl = NULL; + t->size = 0; + t->progress = 0; + t->ut_metadata = 0; + t->ut_pex = 0; +} + +/** * prints a torrent, for debugging purposes * * @param s [out] stream to which to write to @@ -266,7 +333,7 @@ void torrent_print (FILE * s, const struct torrent * t) { char buf[41]; buf[40] = '\0'; bin2hex(buf, t->hash, 20); - printf("magnet:?xt=urn:btih:%s\n\t**** PEERS ****\n", buf); + printf("magnet:?xt=urn:btih:%s%s%s%s\n\t**** PEERS ****\n", buf, t->type & announce ? " announce" : "", t->type & peers ? " peers" : "", t->type & info ? " info" : ""); struct peer * p = t->peers; while (p) { fprintf(s, "\t"); @@ -296,7 +363,6 @@ struct dht { struct bucket * buckets; struct bucket * buckets6; /**< IPv6 routing table */ struct torrent * torrents; /**< linked list of torrents for which we want to know peers */ - int dl; /**< dirfd storage directory for info torrents */ void (* possible_torrent)(struct dht *, const unsigned char *); /**< a user callback function that is called whenever we come across a torrent hash from a network */ void * userdata; /**< unused, but left for the library user to set so he can refer back to his structures from callback code, such as dht->possible_torrent(d, h) */ unsigned torrents_num; /**< number of torrents. this number can rise indefinitely, so it can, and should be capped by the caller, depending on available memory */ @@ -310,6 +376,8 @@ struct dht { unsigned txp; /**< statistics: total number of sent packets */ unsigned rxb; /**< statistics: total number of bytes in received UDP bodies */ unsigned txb; /**< statistics: total number of bytes in transmitted UDP bodies */ + unsigned tcp_max; /**< max number of tcp connections to peers */ + void * (* connection)(struct dht *, struct torrent *); /**< function for maintaining file descriptors for metadata downloading. this function is called with the torrent that just attempted to connect to a peer. it's the responsibility of the library user to then add the torrent->socket fd to a poll list of fds and to add a function in torrent->disconnection that will be called whenever the fd is closed so that the library user may remove the fd from the pollfds list. */ }; /** @@ -326,7 +394,7 @@ void dht_print (FILE * s, const struct dht * d) { char secret[17*2]; secret[17*2+1] = '\0'; bin2hex(secret, d->secret, 16); - fprintf(s, "id=%s socket=%d dl=%d t=%u p=%u tmax=%u pmax=%u p/t-max=%u runsec=%ld rxp=%u txp=%u rxb=%u txb=%u secret=%s\n", buf, d->socket, d->dl, d->torrents_num, d->peers_num, d->torrents_max, d->peers_max, d->peers_per_torrent_max, seconds()-d->time, d->rxp, d->txp, d->rxb, d->txb, secret); + fprintf(s, "id=%s socket=%d t=%u p=%u tmax=%u pmax=%u p/t-max=%u runsec=%ld rxp=%u txp=%u rxb=%u txb=%u secret=%s\n", buf, d->socket, d->torrents_num, d->peers_num, d->torrents_max, d->peers_max, d->peers_per_torrent_max, seconds()-d->time, d->rxp, d->txp, d->rxb, d->txb, secret); printf("**** TORRENTS ****\n"); struct torrent * t = d->torrents; while (t) { @@ -398,9 +466,9 @@ void sendb (struct dht * d, struct bencoding * b, const struct sockaddr_in6 * a) binsert(b, v); struct bencoding * ip = calloc(1, sizeof *ip); ip->type = string; - ip->key = bstr(strdup("key")); - memcpy((b->value = malloc(18)), a->sin6_addr.s6_addr + (family(a->sin6_addr.s6_addr) == AF_INET ? 12 : 0), (b->valuelen = family(a->sin6_addr.s6_addr) == AF_INET ? 6 : 18)); - memcpy(b->value + (family(a->sin6_addr.s6_addr) == AF_INET ? 4 : 16), &a->sin6_port, 2); + ip->key = bstr(strdup("ip")); + memcpy((ip->value = malloc(18)), a->sin6_addr.s6_addr + (family(a->sin6_addr.s6_addr) == AF_INET ? 12 : 0), (ip->valuelen = family(a->sin6_addr.s6_addr) == AF_INET ? 6 : 18)); + memcpy(ip->value + (family(a->sin6_addr.s6_addr) == AF_INET ? 4 : 16), &a->sin6_port, 2); binsert(b, ip); int len = b2json_length(b); char json[len+1]; @@ -508,10 +576,9 @@ struct dht * dht_init (const struct bencoding * c) { struct dht * d = calloc(1, sizeof *d); d->time = seconds(); d->log = stderr; - d->dl = -1; d->buckets = bucket_init(); d->buckets6 = bucket_init(); - d->possible_torrent = &possible_torrent; + d->possible_torrent = possible_torrent; d->torrents_max = UINT_MAX; // this is hardcore - so many torrents makes LL traversal too slow d->peers_max = UINT_MAX; // there's no way there even are this many peers on the entire network at a time xDDDDDDDDDDD d->peers_per_torrent_max = UINT_MAX; @@ -1085,7 +1152,7 @@ void oom (struct dht * d) { /** * adds a torrent to a list of torrents * - * if the torrent already exists in the database flags of this one will be anded with the flags of the old one, meaning this function can be used to set peers, announce, info and dl flags. see @return for important details. + * if the torrent already exists in the database flags of this one will be anded with the flags of the old one, meaning this function can be used to set peers, announce and info flags. see @return for important details. * * @param d [in] dht library handler, for counting torrents in storage * @param t [in] torrent object, whose memory ownership is transfered to the library and must be heap allocated @@ -1122,20 +1189,51 @@ struct torrent * add_torrent (struct dht * d, struct torrent * t) { */ struct peer * add_peer (struct dht * d, struct torrent * t, struct peer * p) { - struct peer * peer = t->peers; - while (peer) { - if (!memcmp(&peer->addr, &p->addr, sizeof p->addr)) { + struct peer ** peer = &t->peers; + struct peer ** bad = NULL; + struct peer ** nondl = NULL; + unsigned l = 0; + while (*peer) { + l++; + if (!memcmp(&(*peer)->addr, &p->addr, sizeof p->addr)) { peer_free(p); - return peer; + return *peer; + } + if (*peer != t->dl) + nondl = peer; + if ((*peer)->flags & badmeta) + bad = peer; + if ((*peer)->flags & nometasupport && !(bad && (*bad) && (*bad)->flags & badmeta)) + bad = peer; + if ((*peer)->flags & unreachable && !(bad && (*bad) && (*bad)->flags & (badmeta | nometasupport))) + bad = peer; + if (l > d->peers_per_torrent_max && !bad) { + l--; + struct peer * next = (*peer)->next; + peer_free(*peer); + *peer = next; + continue; } - peer = peer->next; + peer = &(*peer)->next; + } + if (bad && l > d->peers_per_torrent_max) { + l--; + struct peer * old = *bad; + *bad = (*bad)->next; + peer_free(old); + } + if (nondl && l > d->peers_per_torrent_max) { + l--; + struct peer * old = *nondl; + *nondl = (*nondl)->next; + peer_free(old); } p->next = t->peers; t->peers = p; d->peers_num++; if (d->peers_num >= d->peers_max) oom(d); - return peer; + return p; } /** @@ -1646,7 +1744,7 @@ void handle (struct dht * d, char * pkt, int len, struct sockaddr_in6 addr) { break; case 'A': // announce case 'a': - ; + raise(SIGINT); tok = bpath(b, "a/token"); #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wpointer-sign" @@ -1671,16 +1769,16 @@ void handle (struct dht * d, char * pkt, int len, struct sockaddr_in6 addr) { y = bstr(strdup("r")); y->key = bstr(strdup("y")); binsert(response, y); - torrent = calloc(1, sizeof *torrent); + sendb(d, response, &addr); + free_bencoding(response); + torrent = torrent_init(); // gucci, because add_torrent returns existing and frees if already stored memcpy(torrent->hash, hash->value, 20); torrent = add_torrent(d, torrent); - struct peer * peer = calloc(1, sizeof *peer); + struct peer * peer = peer_init(); // same as with torrent memcpy(&peer->addr, &addr, sizeof addr); if (bpath(b, "a/port") && (!bpath(b, "a/implied_port") || !bpath(b, "a/implied_port")->intvalue)) peer->addr.sin6_port = htons(bpath(b, "a/port")->intvalue); - add_peer(d, torrent, peer); - sendb(d, response, &addr); - free_bencoding(response); + peer = add_peer(d, torrent, peer); break; default: // see NOTE01 ; @@ -1986,6 +2084,52 @@ void periodic (struct dht * d) { } } } + if (t->type & info) { + if (t->dl) { + if (seconds() - t->time > DLTO) { + t->dl->flags |= unreachable; + disconnect(t); + } else + goto a; + } + struct peer * p = t->peers; + int c = 0; + while (p) { + if (!(p->flags & (badmeta | nometasupport | unreachable))) + c++; + p = p->next; + } + int s = rand() % c; // OB1 untested + p = t->peers; + while (p) { + if (!(p->flags & (badmeta | nometasupport | unreachable)) && !s--) { + t->dl = p; + t->state = 0; + t->socket = socket(AF_INET6, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, 0); + if (t->socket == -1) { + t->dl = NULL; + L(d->log, "socket: %s", strerror(errno)); + break; + } +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wincompatible-pointer-types" + if (connect(t->socket, &p->addr, sizeof p->addr) == -1) { +#pragma GCC diagnostic pop + if (errno != EAGAIN) { + L(d->log, "connect: %s", strerror(errno)); + disconnect(t); + } + } + t->time = seconds(); + t->size = 0; + t->progress = 0; + d->connection(d, t); + break; + } + p = p->next; + } + } + a: t = t->next; } return; @@ -2019,8 +2163,183 @@ void work (struct dht * d) { else if (ret < 0) { if (errno != EAGAIN) L(d->log, "recvfrom(): %s (%d)", strerror(errno), errno); - else + else { + struct torrent * t = d->torrents; + while (t) { + if (!t->dl) + goto c; + if (!t->state) { + unsigned char handshake[1+19+8+20*2] = "aBitTorrent protocol"; + handshake[0] = 19; + handshake[1+19+5] = 0x10; + memcpy(handshake+1+19+8, t->hash, 20); + memcpy(handshake+1+19+8+20, d->id, 20); + if (send(t->socket, handshake, 1+19+8+20*2, MSG_DONTWAIT | MSG_NOSIGNAL) != -1) + t->state |= handshake_sent; + else + if (errno != EAGAIN) { + L(d->log, "send(): %s", strerror(errno)); + disconnect(t); + goto c; + } + } + if (t->state & handshake_received) { + char e[1024] = "\0\0\0\0\0\0d1:md11:ut_metadatai1eee"; + e[3] = 2+strlen(e+6); + e[4] = 20; + if (send(t->socket, e, e[3]+4, MSG_DONTWAIT | MSG_NOSIGNAL) != -1) + t->state |= extension_sent; + else + if (errno != EAGAIN) { + L(d->log, "send(): %s", strerror(errno)); + disconnect(t); + goto c; + } + } + if (t->ut_metadata && !(t->dl->flags & requested)) { + char r[1024] = "\0\0\0"; + r[4] = t->ut_metadata; + r[3] = 2+sprintf(r+6, "d8:msg_typei0e5:piecei%uee", t->progress); + if (send(t->socket, r, r[3]+4, MSG_DONTWAIT | MSG_NOSIGNAL) != -1) + t->dl->flags |= requested; + else + if (errno != EAGAIN) { + L(d->log, "send(): %s", strerror(errno)); + disconnect(t); + goto c; + } + } + ret = recv(t->socket, packet, 65536, MSG_DONTWAIT | MSG_PEEK); + if (ret < 0) { + if (errno != EAGAIN) { + L(d->log, "recv(TCP, MSG_PEEK): %s (%d)", strerror(errno), errno); + disconnect(t); + } + goto c; + } + if (!t->state && ret >= 1+19+8+20*2) { + recv(t->socket, packet, 1+19+8+20*2, MSG_DONTWAIT); + t->state |= handshake_received; + if (memcmp(packet+1+19+8, t->hash, 20)) { +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wpointer-sign" + possible_torrent(d, packet+1+19+8); +#pragma GCC diagnostic pop + disconnect(t); + goto c; + } + if (!(packet[1+19+5] & 0x10)) { + t->dl->flags |= nometasupport; + disconnect(t); + goto c; + } + } + uint32_t l = ntohl(*((uint32_t *) packet)); +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wsign-compare" + if (t->state && !packet[0] /* to prevent ultra large packets and sign bit */ && ret >= 4+l) { +#pragma GCC diagnostic pop + recv(t->socket, packet, 4+l, MSG_DONTWAIT); + if (packet[4] == 20) { +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wpointer-sign" + struct bencoding * e = bdecode(packet+6, l-2, replace); +#pragma GCC diagnostic pop + switch (packet[5]) { + case 0: + ; + struct bencoding * ut_metadata = bpath(e, "m/ut_metadata"); + struct bencoding * ut_pex = bpath(e, "m/ut_pex"); + struct bencoding * metadata_size = bpath(e, "metadata_size"); + if (ut_pex && ut_pex->type & num) { + t->ut_pex = ut_pex->intvalue; + t->dl->flags |= pex; + } + if (ut_metadata && ut_metadata->type & num) + t->ut_metadata = ut_pex->intvalue; + if (metadata_size && metadata_size->type & num && metadata_size->intvalue > 0 && metadata_size->intvalue < 100000000) { + t->size = metadata_size->intvalue; + t->metadata = realloc(t->metadata, metadata_size->intvalue); + } else { + if (t->ut_pex) { + // ask for pex + } + disconnect(t); + goto c; + } + break; + case 1: + ; + struct bencoding * msg_type = bpath(e, "msg_type"); + struct bencoding * piece = bpath(e, "piece"); + if (msg_type && msg_type->type & num && piece && piece->type & num) { + switch (msg_type->intvalue) { + case 0: // request, we just reject it + ; + char r[1024] = "\0\0\0"; + r[4] = t->ut_metadata; + r[3] = 2+sprintf(r+6, "d8:msg_typei2e5:piecei%ldee", piece->intvalue); + if (send(t->socket, r, r[3]+4, MSG_DONTWAIT | MSG_NOSIGNAL) == -1 && errno != EAGAIN) { + L(d->log, "send(): %s", strerror(errno)); + disconnect(t); + goto c; + } + break; + case 1: // data + if (piece->intvalue != t->progress) + break; + if (t->size < (t->progress+1)*16384) { + disconnect(t); + L(d->log, "sent more packets than space available. UNREACHABLE!!!"); + goto c; + } + if (!t->metadata) { + disconnect(t); + L(d->log, "i have nowhere to store!"); + goto c; + } + packet[65535] = '\0'; + char * ee = strstr(packet+6, "ee"); + if (!ee) { + disconnect(t); + L(d->log, "malformed packet"); + goto c; + } + memcpy(t->metadata+t->progress++*16384, ee+2, 16384); + if (t->progress*16384 >= t->size) { + SHA1_CTX sha; // TODO SHA256 for torrent v2 + uint8_t results[SHA1_DIGEST_LENGTH]; + SHA1Init(&sha); + SHA1Update(&sha, t->metadata, t->size); + SHA1Final(results, &sha); + if (memcmp(results, t->hash, 20)) { + t->dl->flags |= badmeta; + t->dl->flags &= ~goodmeta; + L(d->log, "received invalid metadata!"); + disconnect(t); + goto c; + } + L(d->log, "received good metadata!"); + disconnect(t); + goto c; + } + t->dl->flags &= ~requested; + break; + case 2: // reject + t->dl->flags |= nometa; + disconnect(t); + goto c; + } + } + break; + } + } + } + c: + t = t->next; + } periodic(d); + } } else { d->rxp++; d->rxb += ret; @@ -85,7 +85,7 @@ int main (int argc, char ** argv) { dht = dht_init(config); dht->possible_torrent = found_torrent; free_bencoding(config); - struct torrent * torrent = calloc(1, sizeof *torrent); + struct torrent * torrent = torrent_init(); memcpy(torrent->hash, "\xdd\x82\x55\xec\xdc\x7c\xa5\x5f\xb0\xbb\xf8\x13\x23\xd8\x70\x62\xdb\x1f\x6d\x1c", 20); torrent->type = announce | peers; add_torrent(dht, torrent); |