Blender  V2.93
device_network.h
Go to the documentation of this file.
1 /*
2  * Copyright 2011-2013 Blender Foundation
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  * http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 
17 #ifndef __DEVICE_NETWORK_H__
18 #define __DEVICE_NETWORK_H__
19 
20 #ifdef WITH_NETWORK
21 
22 # include <boost/archive/binary_iarchive.hpp>
23 # include <boost/archive/binary_oarchive.hpp>
24 # include <boost/archive/text_iarchive.hpp>
25 # include <boost/archive/text_oarchive.hpp>
26 # include <boost/array.hpp>
27 # include <boost/asio.hpp>
28 # include <boost/bind.hpp>
29 # include <boost/serialization/vector.hpp>
30 # include <boost/thread.hpp>
31 
32 # include <deque>
33 # include <iostream>
34 # include <sstream>
35 
36 # include "render/buffers.h"
37 
38 # include "util/util_foreach.h"
39 # include "util/util_list.h"
40 # include "util/util_map.h"
41 # include "util/util_param.h"
42 # include "util/util_string.h"
43 
45 
46 using std::cerr;
47 using std::cout;
48 using std::exception;
49 using std::hex;
50 using std::setw;
51 
52 using boost::asio::ip::tcp;
53 
54 static const int SERVER_PORT = 5120;
55 static const int DISCOVER_PORT = 5121;
56 static const string DISCOVER_REQUEST_MSG = "REQUEST_RENDER_SERVER_IP";
57 static const string DISCOVER_REPLY_MSG = "REPLY_RENDER_SERVER_IP";
58 
59 # if 0
60 typedef boost::archive::text_oarchive o_archive;
61 typedef boost::archive::text_iarchive i_archive;
62 # else
63 typedef boost::archive::binary_oarchive o_archive;
64 typedef boost::archive::binary_iarchive i_archive;
65 # endif
66 
67 /* Serialization of device memory */
68 
69 class network_device_memory : public device_memory {
70  public:
71  network_device_memory(Device *device) : device_memory(device, "", MEM_READ_ONLY)
72  {
73  }
74 
75  ~network_device_memory()
76  {
77  device_pointer = 0;
78  };
79 
80  vector<char> local_data;
81 };
82 
83 /* Common network error function / object for both DeviceNetwork and DeviceServer. */
84 class NetworkError {
85  public:
86  NetworkError()
87  {
88  error = "";
89  error_count = 0;
90  }
91 
92  ~NetworkError()
93  {
94  }
95 
96  void network_error(const string &message)
97  {
98  error = message;
99  error_count += 1;
100  }
101 
102  bool have_error()
103  {
104  return true ? error_count > 0 : false;
105  }
106 
107  private:
108  string error;
109  int error_count;
110 };
111 
112 /* Remote procedure call Send */
113 
114 class RPCSend {
115  public:
116  RPCSend(tcp::socket &socket_, NetworkError *e, const string &name_ = "")
117  : name(name_), socket(socket_), archive(archive_stream), sent(false)
118  {
119  archive &name_;
120  error_func = e;
121  fprintf(stderr, "rpc send %s\n", name.c_str());
122  }
123 
124  ~RPCSend()
125  {
126  }
127 
128  void add(const device_memory &mem)
129  {
130  archive &mem.data_type &mem.data_elements &mem.data_size;
131  archive &mem.data_width &mem.data_height &mem.data_depth &mem.device_pointer;
132  archive &mem.type &string(mem.name);
133  archive &mem.interpolation &mem.extension;
134  archive &mem.device_pointer;
135  }
136 
137  template<typename T> void add(const T &data)
138  {
139  archive &data;
140  }
141 
142  void add(const DeviceTask &task)
143  {
144  int type = (int)task.type;
145  archive &type &task.x &task.y &task.w &task.h;
146  archive &task.rgba_byte &task.rgba_half &task.buffer &task.sample &task.num_samples;
147  archive &task.offset &task.stride;
148  archive &task.shader_input &task.shader_output &task.shader_eval_type;
149  archive &task.shader_x &task.shader_w;
150  archive &task.need_finish_queue;
151  }
152 
153  void add(const RenderTile &tile)
154  {
155  archive &tile.x &tile.y &tile.w &tile.h;
156  archive &tile.start_sample &tile.num_samples &tile.sample;
157  archive &tile.resolution &tile.offset &tile.stride;
158  archive &tile.buffer;
159  }
160 
161  void write()
162  {
163  boost::system::error_code error;
164 
165  /* get string from stream */
166  string archive_str = archive_stream.str();
167 
168  /* first send fixed size header with size of following data */
169  ostringstream header_stream;
170  header_stream << setw(8) << hex << archive_str.size();
171  string header_str = header_stream.str();
172 
173  boost::asio::write(
174  socket, boost::asio::buffer(header_str), boost::asio::transfer_all(), error);
175 
176  if (error.value())
177  error_func->network_error(error.message());
178 
179  /* then send actual data */
180  boost::asio::write(
181  socket, boost::asio::buffer(archive_str), boost::asio::transfer_all(), error);
182 
183  if (error.value())
184  error_func->network_error(error.message());
185 
186  sent = true;
187  }
188 
189  void write_buffer(void *buffer, size_t size)
190  {
191  boost::system::error_code error;
192 
193  boost::asio::write(
194  socket, boost::asio::buffer(buffer, size), boost::asio::transfer_all(), error);
195 
196  if (error.value())
197  error_func->network_error(error.message());
198  }
199 
200  protected:
201  string name;
202  tcp::socket &socket;
203  ostringstream archive_stream;
204  o_archive archive;
205  bool sent;
206  NetworkError *error_func;
207 };
208 
209 /* Remote procedure call Receive */
210 
211 class RPCReceive {
212  public:
213  RPCReceive(tcp::socket &socket_, NetworkError *e)
214  : socket(socket_), archive_stream(NULL), archive(NULL)
215  {
216  error_func = e;
217  /* read head with fixed size */
218  vector<char> header(8);
219  boost::system::error_code error;
220  size_t len = boost::asio::read(socket, boost::asio::buffer(header), error);
221 
222  if (error.value()) {
223  error_func->network_error(error.message());
224  }
225 
226  /* verify if we got something */
227  if (len == header.size()) {
228  /* decode header */
229  string header_str(&header[0], header.size());
230  istringstream header_stream(header_str);
231 
232  size_t data_size;
233 
234  if ((header_stream >> hex >> data_size)) {
235 
236  vector<char> data(data_size);
237  size_t len = boost::asio::read(socket, boost::asio::buffer(data), error);
238 
239  if (error.value())
240  error_func->network_error(error.message());
241 
242  if (len == data_size) {
243  archive_str = (data.size()) ? string(&data[0], data.size()) : string("");
244 
245  archive_stream = new istringstream(archive_str);
246  archive = new i_archive(*archive_stream);
247 
248  *archive &name;
249  fprintf(stderr, "rpc receive %s\n", name.c_str());
250  }
251  else {
252  error_func->network_error("Network receive error: data size doesn't match header");
253  }
254  }
255  else {
256  error_func->network_error("Network receive error: can't decode data size from header");
257  }
258  }
259  else {
260  error_func->network_error("Network receive error: invalid header size");
261  }
262  }
263 
264  ~RPCReceive()
265  {
266  delete archive;
267  delete archive_stream;
268  }
269 
270  void read(network_device_memory &mem, string &name)
271  {
272  *archive &mem.data_type &mem.data_elements &mem.data_size;
273  *archive &mem.data_width &mem.data_height &mem.data_depth &mem.device_pointer;
274  *archive &mem.type &name;
275  *archive &mem.interpolation &mem.extension;
276  *archive &mem.device_pointer;
277 
278  mem.name = name.c_str();
279  mem.host_pointer = 0;
280 
281  /* Can't transfer OpenGL texture over network. */
282  if (mem.type == MEM_PIXELS) {
283  mem.type = MEM_READ_WRITE;
284  }
285  }
286 
287  template<typename T> void read(T &data)
288  {
289  *archive &data;
290  }
291 
292  void read_buffer(void *buffer, size_t size)
293  {
294  boost::system::error_code error;
295  size_t len = boost::asio::read(socket, boost::asio::buffer(buffer, size), error);
296 
297  if (error.value()) {
298  error_func->network_error(error.message());
299  }
300 
301  if (len != size)
302  cout << "Network receive error: buffer size doesn't match expected size\n";
303  }
304 
305  void read(DeviceTask &task)
306  {
307  int type;
308 
309  *archive &type &task.x &task.y &task.w &task.h;
310  *archive &task.rgba_byte &task.rgba_half &task.buffer &task.sample &task.num_samples;
311  *archive &task.offset &task.stride;
312  *archive &task.shader_input &task.shader_output &task.shader_eval_type;
313  *archive &task.shader_x &task.shader_w;
314  *archive &task.need_finish_queue;
315 
316  task.type = (DeviceTask::Type)type;
317  }
318 
319  void read(RenderTile &tile)
320  {
321  *archive &tile.x &tile.y &tile.w &tile.h;
322  *archive &tile.start_sample &tile.num_samples &tile.sample;
323  *archive &tile.resolution &tile.offset &tile.stride;
324  *archive &tile.buffer;
325 
326  tile.buffers = NULL;
327  }
328 
329  string name;
330 
331  protected:
332  tcp::socket &socket;
333  string archive_str;
334  istringstream *archive_stream;
335  i_archive *archive;
336  NetworkError *error_func;
337 };
338 
339 /* Server auto discovery */
340 
341 class ServerDiscovery {
342  public:
343  explicit ServerDiscovery(bool discover = false)
344  : listen_socket(io_service), collect_servers(false)
345  {
346  /* setup listen socket */
347  listen_endpoint.address(boost::asio::ip::address_v4::any());
348  listen_endpoint.port(DISCOVER_PORT);
349 
350  listen_socket.open(listen_endpoint.protocol());
351 
352  boost::asio::socket_base::reuse_address option(true);
353  listen_socket.set_option(option);
354 
355  listen_socket.bind(listen_endpoint);
356 
357  /* setup receive callback */
358  async_receive();
359 
360  /* start server discovery */
361  if (discover) {
362  collect_servers = true;
363  servers.clear();
364 
365  broadcast_message(DISCOVER_REQUEST_MSG);
366  }
367 
368  /* start thread */
369  work = new boost::asio::io_service::work(io_service);
370  thread = new boost::thread(boost::bind(&boost::asio::io_service::run, &io_service));
371  }
372 
373  ~ServerDiscovery()
374  {
375  io_service.stop();
376  thread->join();
377  delete thread;
378  delete work;
379  }
380 
381  vector<string> get_server_list()
382  {
384 
385  mutex.lock();
386  result = vector<string>(servers.begin(), servers.end());
387  mutex.unlock();
388 
389  return result;
390  }
391 
392  private:
393  void handle_receive_from(const boost::system::error_code &error, size_t size)
394  {
395  if (error) {
396  cout << "Server discovery receive error: " << error.message() << "\n";
397  return;
398  }
399 
400  if (size > 0) {
401  string msg = string(receive_buffer, size);
402 
403  /* handle incoming message */
404  if (collect_servers) {
405  if (msg == DISCOVER_REPLY_MSG) {
406  string address = receive_endpoint.address().to_string();
407 
408  mutex.lock();
409 
410  /* add address if it's not already in the list */
411  bool found = std::find(servers.begin(), servers.end(), address) != servers.end();
412 
413  if (!found)
414  servers.push_back(address);
415 
416  mutex.unlock();
417  }
418  }
419  else {
420  /* reply to request */
421  if (msg == DISCOVER_REQUEST_MSG)
422  broadcast_message(DISCOVER_REPLY_MSG);
423  }
424  }
425 
426  async_receive();
427  }
428 
429  void async_receive()
430  {
431  listen_socket.async_receive_from(boost::asio::buffer(receive_buffer),
432  receive_endpoint,
433  boost::bind(&ServerDiscovery::handle_receive_from,
434  this,
436  boost::asio::placeholders::bytes_transferred));
437  }
438 
439  void broadcast_message(const string &msg)
440  {
441  /* setup broadcast socket */
442  boost::asio::ip::udp::socket socket(io_service);
443 
444  socket.open(boost::asio::ip::udp::v4());
445 
447  socket.set_option(option);
448 
449  boost::asio::ip::udp::endpoint broadcast_endpoint(
450  boost::asio::ip::address::from_string("255.255.255.255"), DISCOVER_PORT);
451 
452  /* broadcast message */
453  socket.send_to(boost::asio::buffer(msg), broadcast_endpoint);
454  }
455 
456  /* network service and socket */
457  boost::asio::io_service io_service;
458  boost::asio::ip::udp::endpoint listen_endpoint;
459  boost::asio::ip::udp::socket listen_socket;
460 
461  /* threading */
462  boost::thread *thread;
463  boost::asio::io_service::work *work;
465 
466  /* buffer and endpoint for receiving messages */
467  char receive_buffer[256];
468  boost::asio::ip::udp::endpoint receive_endpoint;
469 
470  // os, version, devices, status, host name, group name, ip as far as fields go
471  struct ServerInfo {
472  string cycles_version;
473  string os;
474  int device_count;
475  string status;
476  string host_name;
477  string group_name;
478  string host_addr;
479  };
480 
481  /* collection of server addresses in list */
482  bool collect_servers;
483  vector<string> servers;
484 };
485 
487 
488 #endif
489 
490 #endif /* __DEVICE_NETWORK_H__ */
ThreadMutex mutex
_GL_VOID GLfloat value _GL_VOID_RET _GL_VOID const GLuint GLboolean *residences _GL_BOOL_RET _GL_VOID GLsizei GLfloat GLfloat GLfloat GLfloat const GLubyte *bitmap _GL_VOID_RET _GL_VOID GLenum type
ATTR_WARN_UNUSED_RESULT const BMVert const BMEdge * e
static DBVT_INLINE btScalar size(const btDbvtVolume &a)
Definition: btDbvt.cpp:52
Definition: device.h:293
int stride
Definition: buffers.h:143
int sample
Definition: buffers.h:140
RenderBuffers * buffers
Definition: buffers.h:152
int num_samples
Definition: buffers.h:139
device_ptr buffer
Definition: buffers.h:146
int offset
Definition: buffers.h:142
int resolution
Definition: buffers.h:141
int start_sample
Definition: buffers.h:138
const char * name
MemoryType type
size_t data_height
DataType data_type
device_ptr device_pointer
bool join()
Definition: util_thread.cpp:56
@ MEM_PIXELS
Definition: device_memory.h:41
@ MEM_READ_WRITE
Definition: device_memory.h:37
@ MEM_READ_ONLY
Definition: device_memory.h:36
#define CCL_NAMESPACE_END
__kernel void ccl_constant KernelData ccl_global void ccl_global char ccl_global int ccl_global char ccl_global unsigned int ccl_global float * buffer
#define T
static void error(const char *str)
Definition: meshlaplacian.c:65
static void add(GHash *messages, MemArena *memarena, const Message *msg)
Definition: msgfmt.c:268
struct blender::compositor::@172::@174 task
static const char hex[17]
Definition: thumbs.c:173
__forceinline bool any(const avxb &b)
Definition: util_avxb.h:218
__forceinline const avxi broadcast(const int *ptr)
Definition: util_avxi.h:639
uint len