问题描述:

I've got an application that needs to send a stream of data from one process to multiple readers, each of which needs to see its own copy of the stream. This is reasonably high-rate (100MB/s is not uncommon), so I'd like to avoid duplication if possible. In my ideal world, linux would have named pipes that supported multiple readers, with a fast path for the common single-reader case.

I'd like something that provides some measure of namespace isolation (eg: broadcasting on 127.0.0.1 is open to any process I believe...). Unix domain sockets don't support broadcast, and UDP is "unreliable" anyways (server will drop packets instead of blocking in my case). I supposed I could create a shared-memory segment and store the common buffers there, but that feels like reinventing the wheel. Is there a canonical way to do this in linux?

网友答案:

I supposed I could create a shared-memory segment and store the common buffers there, but that feels like reinventing the wheel. Is there a canonical way to do this in linux?

The short answer: No

The long answer: Yes [and you're on the right track]

I've had to do this before [for even higher speeds], so I had to research this. The following is what I came up with.

In the main process, create a pool of shared buffers [use SysV shm or private mmap as you chose]. Assign ID numbers to them (e.g. 1,2,3,...). Now there is a mapping from bufid to buffer memory address. To make this accessible to child processes, do this before you fork them. The children also inherit the shared memory mappings, so not much work

Now fork the children. Give them each a unique process id. You can just incrementally start with a number: 2,3,4,... [main is 1] or just use regular pids.

Open up a SysV msg channel (msgget et. al.). Again, if you do this in the main process before the fork, they are available to the children [IIRC].


Now here's how it works:

main finds an unused buffer and fills it. For each child, main sends an IPC message via msgsnd (on the single common IPC channel) where the message payload [mtext] is the bufid number. Each message has the standard header's mtype field set to the destination child's pid.

After doing this, main remembers the buffer as "in flight" and not yet reusable.

Each child does a msgrcv with the mtype set to its pid. It then extracts the bufid from mtext and processes the buffer. When it's done, it sends an IPC message [again on the same channel] with mtype set to main's pid with an mtext of the bufid it just processed.

main's loop does an non-blocking msgrcv, noting all "release" messages for a given bufid. When all children have released the buffer, it's put back on the buffer "free queue". In main's service loop, it may fill new buffers and send more messages as appropriate [intersperse with the waits].

The child then does an msgrcv and the cycle repeats.

So, we're using [large] shared memory buffers and short [a few bytes] bufid descriptor IPC messages.


Okay, so the question you may be asking: "Why SysV IPC for the comm channel?" [vs. multiple pipes or sockets].

You already know that a shared buffer avoids sending multiple copies of your data.

So, that's the way to go. But, why not send the above bufid messages across sockets or pipes [or shared queues, condition variables, mutexes, etc]?

The answer is speed and the wakeup characteristics of the target process.

For a highly realtime response, when main sends out the bufid messages, you want the target process [if it's been sleeping] to wake up immediately and start processing the buffer.

I examined the linux kernel source and the only mechanism that has that characteristic is SysV IPC. All others have a [scheduling] lag.

When process A does msgsnd on a channel that process B has done msgrcv on, three things will happen:

  1. process B will be marked runnable by the scheduler.
  2. [IIRC] B will be moved to the front of its scheduling queue
  3. Also, more importantly, this then causes an immediate reschedule of all processes.

B will start right away [as opposed to next timer interrupt or when some other process just happens to sleep]. On a single core machine, A will be put to sleep and B will run in its stead.

Caveat: All my research was done a few years back before the CFS scheduler, but, I believe the above should still hold. Also, I was using the RT scheduler, which may be a possible option if CFS doesn't work as intended.


UPDATE:

Looking at the POSIX message queue source, I think that the same immediate-wakeup behavior you discussed with the System V queues is going on, which gives the added benefit of POSIX compatibility.

The timing semantics are possible [and desirable] so I wouldn't be surprised. But, SysV is actually more standard and ubiquitous than POSIX mqueues. And, there are some semantic differences [See below].

For timing, you can build a unit test program [just using msgs] with nsec timestamps. I used TSC stamps, but clock_gettime(CLOCK_REALTIME,...) might also work. Stamp departure time and arrival/wakeup time to see. Compare both SysV and mq

With either SysV or mq you may need to bump up the max # of msgs, max msg size, max # of queues via /proc/*. The default values are relatively small. If you don't, you may find tasks blocked waiting for a msg but master can't send one [is blocked] due to a msg queue maximum parameter being exceeded. I actually had such a bug, so I changed my code to bump up these values [it was running as root] during startup. So, you may need to do this as an RC boot script (or whatever the [atrocious ;-)] systemd equivalent is)

I looked at using mq to replace SysV in my own code. It didn't have the same semantics for a many-to-one return-to-free-pool msg. In my original answer, I had forgotten to mention that two msg queues are needed: master-to-children (e.g. work-to-do) and children-to-master (e.g. returning a now available buffer).

I had several different types of buffers (e.g. compressed video, compressed audio, uncompressed video, uncompressed audio) that had varying types and struct descriptors.

Also, multiple different buffer queues as these buffers got passed from thread to thread [different processing stages].

With SysV you can use a single msg queue for multiple buffer lists/queues, the buffer list ID is the msg mtype. A child msgrcv waits with mtype set to the ID value. The master waits on the return-to-free msg queue with mtype of 0.

mq* requires a separate mqd_t for each ID because it doesn't allow a wait on a msg subtype.

msgrcv allows IPC_NOWAIT on each call, but to get the same effect with mq_receive you have to open the queue with O_NONBLOCK or use the timed version. This gets used during the "shutdown" or "restart" phase (e.g. send a msg to children that no more data will arrive and they should terminate [or reconfigure, etc.]). The IPC_NOWAIT is handy for "draining" a queue during program startup [to get rid of stale messages from a prior invocation] or drain stale messages from a prior configuration during operation.

So, instead of just two SysV msg queues to handle an arbitrary number of buffer lists, you'll need a separate mqd_t for each buffer list/type.

相关阅读:
Top