RIT VEXU Core API
Loading...
Searching...
No Matches
registry-listener.hpp
1#pragma once
2#include "protocol.hpp"
3#include <deque>
4
5namespace VDP {
9template <typename MutexType> class RegistryListener {
10public:
11 int num_bad = 0;
12 int num_small = 0;
13 using CallbackFn = std::function<void(const VDP::Channel &)>;
20 RegistryListener(AbstractDevice *device) : device(device) {
21 device->register_receive_callback([&](const Packet &p) {
22 printf("Listener: GOT PACKET\n");
23 take_packet(p);
24 });
25 };
26
31 void take_packet(const Packet &pac) {
32 VDPTracef("Received packet of size %d", (int)pac.size());
33 // checks the validity of the packet
34 const VDP::PacketValidity status = validate_packet(pac);
35
36 if (status == VDP::PacketValidity::BadChecksum) {
37 VDPWarnf("Listener: Bad packet checksum. Skipping");
38 num_bad++;
39 return;
40 } else if (status == VDP::PacketValidity::TooSmall) {
41 num_small++;
42 VDPWarnf("Listener: Packet too small to be valid (%d bytes). Skipping",
43 (int)pac.size());
44 return;
45 } else if (status != VDP::PacketValidity::Ok) {
46 VDPWarnf("Listener: Unknown validity of packet (BAD). Skipping");
47 return;
48 }
49 // checks the packet function from the header
50 const VDP::PacketHeader header = VDP::decode_header_byte(pac[0]);
51 if (header.func == VDP::PacketFunction::Send) {
52 VDPTracef("Listener: PacketFunction Send");
53
54 if (header.type == VDP::PacketType::Data) {
55 // if the packet is a data, get the data from the packet
56 VDPTracef("Listener: PacketType Data");
57 // get the channel id from the second byte of the packet
58 const ChannelID id = pac[1];
59 // stores the channel id's schema in a Part Pointer
60 const PartPtr part = get_remote_schema(id);
61 if (part == nullptr) {
62 VDPDebugf("VDB-Listener: No channel information for id: %d", id);
63 return;
64 }
65 // creates a PacketReader starting after the channel id location
66 PacketReader reader{pac, 2};
67 // stores the data read from the packet to the Registry Part
68 part->read_data_from_message(reader);
69 // runs the channel's on data callback
70 on_data(Channel{part, id});
71 } else if (header.type == VDP::PacketType::Broadcast) {
72 printf("got broadcast packet\n");
73 // if the packet is a broadcast, decode the packet
74 VDPTracef("Listener: PacketType Broadcast", "");
75 auto decoded = VDP::decode_broadcast(pac);
76 // create a channel and give it the decoded packet
77 VDP::Channel chan{decoded.second, decoded.first};
78 // checks if the new channel is outside of the vector of remote channels
79 if (channels.size() < chan.id) {
80 VDPWarnf("Listener: Out of order broadcast. dropping");
81 return;
82 }
83 // adds the channel to the vector of remote channels
84 channels.push_back(chan);
85 VDPTracef("Listener: Got broadcast of channel %d", int(chan.id));
86 // runs the channel's on broadcast callback
87 on_broadcast(chan);
88
89 // creates a packet and writes the channel acknowledgement to it,
90 // then sends it to the device
91 Packet scratch;
92 PacketWriter writer{scratch};
93 writer.write_channel_acknowledge(chan);
94 device->send_packet(writer.get_packet());
95 printf("Listener: sent channel ack\n");
96 }
97 } else if (header.func == VDP::PacketFunction::Request) {
98 printf("got request packet\n");
99 // if the packet is a data, get the data from the packet
100 VDPTracef("Listener: PacketType Request");
101 // creates a PacketReader starting after the channel id location
102 if(channel_response_queue.size() > 0){
103 Packet scratch;
104 PacketWriter writer{scratch};
105 response_queue_mutex.lock();
106 writer.write_response(channel_response_queue);
107 response_queue_mutex.unlock();
108 device->send_packet(writer.get_packet());
109 printf("Listener: sent available data\n");
110 }
111 else{
112 printf("no data available for response\n");
113 }
114
115 }
116 };
117
123 bool submit_response(PacketType type, ChannelID id, PartPtr data) {
124 if (type != PacketType::Data) {
125 printf("packet type is not data, not usable data\n");
126 return false;
127 }
128 VDP::Channel channel_response = channels[id];
129 channel_response.data = data;
130 if (channels.size() < id) {
131 printf("cannot respond to channel: %d, channel does not exist\n", id);
132 return false;
133 }
134 response_queue_mutex.lock();
135 channel_response_queue.push_back(channel_response);
136 response_queue_mutex.unlock();
137 return true;
138 };
139
140 PartPtr get_remote_schema(ChannelID id) {
141 if (id >= channels.size()) {
142 return nullptr;
143 }
144 return channels[id].data;
145 };
152 void install_broadcast_callback(CallbackFn on_broadcastf) {
153 VDPTracef("Listener: Installed broadcast callback for ");
154 this->on_broadcast = (on_broadcastf);
155 };
156
161 void install_data_callback(CallbackFn on_dataf) {
162 VDPTracef("Listener: Installed data callback for ");
163 this->on_data = (on_dataf);
164 };
165
171 bool send_data(ChannelID id, PartPtr data) {
172 // checks if the channel is actually stored in the Registry
173 if (id > channels.size()) {
174 printf("VDB-Listener: Channel with ID %d doesn't exist yet\n", (int)id);
175 return false;
176 }
177 // sets the channel's data to the Part Pointer given
178 Channel &chan = channels[id];
179 chan.data = data;
180 // checks if the channel has been acknowledged yet
181 if (!chan.acked) {
182 printf("VDB-Listener: Channel %d has not yet been negotiated. Dropping "
183 "packet\n",
184 (int)id);
185 return false;
186 }
187 // if it has been acknowledged write the channel's data to a packet and send
188 // it to the device
189 VDP::Packet scratch;
190 PacketWriter writ{scratch};
191
192 writ.write_data_message(chan);
193 VDP::Packet pac = writ.get_packet();
194
195 return device->send_packet(pac);
196 };
197
202private:
203 ChannelID new_channel_id() {
204 ChannelID id = next_channel_id;
205 next_channel_id++;
206 return id;
207 }
208 bool needs_ack = false;
209 static constexpr size_t ack_ms = 500;
210
211 AbstractDevice *device;
212 // Our channels (us -> them)
213 std::vector<Channel> channels;
214 ChannelID next_channel_id = 0;
215 std::deque<Channel> chans_to_send;
216
217 // The channels we know about from the other side
218 // (them -> us)
219 std::deque<Channel> channel_response_queue;
220
221 MutexType response_queue_mutex;
222
223 CallbackFn on_broadcast = [&](VDP::Channel chan) {
224 std::string schema_str = chan.data->pretty_print();
225 printf("VDB-Listener: No Broadcast Callback installed: Received broadcast "
226 "for channel id "
227 "%d:\n%s\n",
228 int(chan.id), schema_str.c_str());
229 };
230 CallbackFn on_data = [](VDP::Channel chan) {
231 printf("VDB Listener: No Data Callback installed: got data for channel "
232 "%d:\n%s\n",
233 int(chan.id), chan.data->pretty_print_data().c_str());
234 };
235 CallbackFn on_rec = [](VDP::Channel chan) {
236 printf(
237 "VDB Listener: No Data Callback installed: Received data for channel "
238 "%d:\n%s\n",
239 int(chan.id), chan.data->pretty_print_data().c_str());
240 };
241};
242} // namespace VDP
Definition protocol.hpp:369
Definition protocol.hpp:293
void write_data_message(const Channel &part)
Definition protocol.cpp:272
void write_channel_acknowledge(const Channel &chan)
Definition protocol.cpp:235
void write_response(std::deque< Channel > &channels)
Definition protocol.cpp:308
bool submit_response(PacketType type, ChannelID id, PartPtr data)
Submits a channel to respond to the board with.
Definition registry-listener.hpp:123
void install_data_callback(CallbackFn on_dataf)
Definition registry-listener.hpp:161
void install_broadcast_callback(CallbackFn on_broadcastf)
Definition registry-listener.hpp:152
void take_packet(const Packet &pac)
Call this if you are a device who has a packet for the protocol to decode.
Definition registry-listener.hpp:31
bool send_data(ChannelID id, PartPtr data)
Definition registry-listener.hpp:171
RegistryListener(AbstractDevice *device)
Definition registry-listener.hpp:20
Definition protocol.hpp:105